https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • a

    Ans Fida

    02/07/2023, 7:48 PM
    Does Flink provide any config option/params to treat Flink SQL as case insensitive? I found this JIRA issue but looks like it’s a WIP and deprioritized
    m
    • 2
    • 1
  • e

    Eugene Kozlov

    02/07/2023, 8:22 PM
    Hello! I am a new in Flink and I need help of community. Introductory: - We have a topic with 12 partitions - Producer writes 50k events per second to it Partition key - userID, we guarantee that user events are processed in the order they were created Events - JSONs of 2 types, 1. JSONs which need to be enriched with a state of a user which send it (every event had user_id field) 2. JSON with rules how to change the state of a user Enriched events are placed in another topic and go further along the pipeline State of one user - dictionary, stored as N rows in Cassandra. Unique users - 30 000 000 Average count of properties in user - 300 We have microservices are written in Golang for all steps of pipeline Running in k8s ------------ Questions: • How configure flink for enriching events with state from cassandra? Should I use statefun for this case? • How many messages per second is one flink instance can handle? How to calculate desired amount of hardware? • Do I need to change the number of partitions? • How to achieve low latency with Apache Flink? Should I rewrite microservice to Java/Scala or not? • Things I should think/avoid before integrate Flink in Production? Any help or ideas appreciated 🙂
  • j

    Jason Politis

    02/07/2023, 8:29 PM
    Hello everyone. I'm trying to test flink-sql via the sql-client, loading a csv file with literally 1 column and 1 row. No matter what i do i keep getting the simplestreamformat is not splittable error 😕 Any ideas?
    • 1
    • 1
  • m

    Matyas Orhidi

    02/07/2023, 9:43 PM
    🧵 How to write the Kafka header from a Kafka Sink?
    s
    • 2
    • 4
  • j

    Jorge Iván Tordecilla Ruíz - Ceiba Software

    02/07/2023, 9:52 PM
    I am trying to perform a data processing in flink with table API, taking data from kafka topics fed from debezium, the pipeline works correctly but after 3 days of processing it throws the error that the heartbeat is lost with the taskmanger.
  • a

    Amir Hossein Sharifzadeh

    02/07/2023, 11:08 PM
    Hello everybody. I am new in (Py)Flink. In order to understand it better, I was trying to run an existing project from: https://apache.googlesource.com/flink-playgrounds/. I could run it on my docker successfully. But still, I have questions: 1) First: In payment_msg_proccessing.py code, I want to run a simple query on Kafka stream (payment_msg table) without insertion data into the sink table (es_sink here), and do some data processing. (In my project, I won’t insert any data). So, is it possible to run the query (queries) on sources (streams) without insertion data into other tables? 2) Second: How can I iterate over results, and print data in the output? For example, I wrote this simple query: table_result = t_env.execute_sql(“select provinceId, payAmount from payment_msg”) then after:
    Copy code
    with table_result.collect() as results:
        for result in results:
            print(result)
    but this code does not work. How can I iterate over table_result and extract all columns? Thank you very much for your assistance. Best. Amir
    d
    • 2
    • 6
  • a

    Amir Hossein Sharifzadeh

    02/08/2023, 2:15 AM
    I forgot to mention that print function did not work, as well. I tried to run a simple query like t_env.execute_sql(“select payAmount from payment_msg”).print() and I got this error messages:
    d
    • 2
    • 13
  • k

    Kyle Ahn

    02/08/2023, 2:38 AM
    [Flink Operator v 1.1.0] Has anyone seen this error? Perhaps related to race condition when deploying multiple flink applications.
    Copy code
    Message: Forbidden!Configured service account doesn't have access. Service account may have been revoked. deployments.extensions "eventlogger-pipeline-v0" is forbidden: User "system:serviceaccount:eventlogger-pipeline-staging:flink" cannot get resource "deployments" in API group "extensions" in the namespace "eventlogger-pipeline-staging"
    [Context] Two flinkdeployments,
    eventlogger-pipeline-v0
    and
    eventlogger-pipeline-v0-1
    attempt to be deployed, but one fails with this exception, and the other goes through. More logs in the thread ->
    g
    g
    • 3
    • 22
  • t

    Tony Yeung

    02/08/2023, 2:39 AM
    Hi all. Having an error about jdbc driver not found. The driver is included in the user application jar. Running Flink in session mode and the Flink cluster have no jdbc driver. Wondering why the driver cannot be found. Thanks in advance.🙏 Error log, code & pom in thread ->
    • 1
    • 3
  • k

    kingsathurthi

    02/08/2023, 5:37 AM
    Hi All, below attached image if our existing pipeline, where there four operator 1. Custom TCP source 2. Process 3. Map 4. sink when ever I increase the parallelism of process,map,sink, job is running fine except the Custom TCP source. what is the approach to have parallelism in the Custom TCP source operator. In the mean while, when I increase the parallelism for other operator new task manager is getting spawned and taskmanager is processing the data, however the custom TCP source operator is listening in only one taskmanager
    d
    • 2
    • 3
  • s

    Sudhan Madhavan

    02/08/2023, 6:25 AM
    Hi All, My flink app (k8s) receives ~1000TPS from kafka source and it just map and sink into another kafka topic. Its capacity is 1 taskmanager, 2cpu and 2G memory for both JM and TM. When I see its memory usage, it gradually gets increased over time but the cpu load is in idle state after some time of job startup. I am wondering why the memory usage keeps on growing when there is no special operations. Any input about this? Thanks in advance.
    x
    • 2
    • 3
  • y

    Yang LI

    02/08/2023, 10:31 AM
    Hello guys, one question about processing time of kafka source , I have seen in doc https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#usage, does it means we are on processing time when we write noWatermarks like this?
    Copy code
    env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
  • y

    Yang LI

    02/08/2023, 10:53 AM
    Hello, just a small details about flink ui in flink 13 UI we have on top of web page with commit right displayed we just upgraded to 16.1 UI and we have on top of web page with "DeadD0d0" and invalid timestamp "1970-01-01T010000+01:00" ... BTW UI for flink 16.0 works also 🫠
  • y

    Yang LI

    02/08/2023, 12:35 PM
  • s

    Shamit Jain

    02/08/2023, 4:58 PM
    Hi All, We have a data loss issue occurred in prod. We are using Flink 1.13.2 with Kafka. I am using Flink table APIs to join the streams from Kafka and AWS KDA to run our jobs. I saw the below error in logs. Can someone please help me to understand the below error? I am trying to simulate the error on my local but no luck. Please note while creating the FLink tables in our jobs we are not using Kafka consumer group.id.
    Copy code
    2023-01-06T21:10:11.885Z {"applicationARN":"arn:aws:kinesisanalytics:us-east-1:542230711021:application/laap-kda-ue1-con-sr-sr-per-dlvry-metrc-imprsn-prod","applicationVersionId":"15","locationInformation":"org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.handleException(PartitionRequestQueue.java:287)","logger":"org.apache.flink.runtime.io.network.netty.PartitionRequestQueue","message":"Encountered error while consuming partitions","messageSchemaVersion":"1","messageType":"ERROR","threadName":"Flink Netty Server (6121) Thread 0","throwableInformation":"org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer\n"}
    j
    a
    • 3
    • 19
  • d

    Drake Esdon

    02/08/2023, 7:58 PM
    Hi everyone 👋 I'm working on a bit of network buffer tuning. Is there is a way to get Flink to log the number of gates and channels?
    ✅ 1
  • a

    Andrei Leibovski

    02/08/2023, 9:05 PM
    Hello everyone, does anyone know what the minimal set of permissions is for S3-based
    high-availability.storageDir
    ? I can't find it in the docs anywhere.
  • r

    Reme Ajayi

    02/08/2023, 9:53 PM
    Hello. how can I use S3 as a streaming file sink. I have put fs-s3-hadoop jar in my plugins folder and added access keys to
    flink-conf.yaml.
    However I am still unable to write files to S3. Error below:
    Copy code
    Could not find a file system implementation for scheme 's3'. The scheme is directly supported by Flink through the following plugin(s): flink-s3-fs-hadoop, flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See <https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/> for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see <https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/>.
    What am I missing?
    m
    g
    h
    • 4
    • 16
  • a

    Andrew Otto

    02/08/2023, 10:17 PM
    Question about side outputs and OutputTags in pyflink. The docs say we are supposed to
    yield output_tag, value
    . But, when I do that, I’m just getting a Tuple(output_tag, value) in the main datastream, not the side output one. What am I doing wrong?
    d
    • 2
    • 26
  • a

    Amir Hossein Sharifzadeh

    02/09/2023, 2:00 AM
    Hello folks. It’s about almost two weeks since I am struggling to run a simple program and I explained enough about the issues. It sounds like that code is fine but I can not get results. Anyway, my main goal is to get data through a Kafka connector with Flink Table API in JSON format and run a query and print all data (i.e. select * from my_table). I have already studied the Table API mechanism but still cannot run the query and see the results. I want to send a very sample data through a Kafka producer and after running query through Table API, print the results in the terminal/console. So, can you please show me a sample project containing a docker file and source codes (whatever Java/Python/Scala) with a good documentation that I could run it easily on my laptop? (I forgot to introduce myself: I am a software developer with more than 10 years experiences of Java EE system development. My expertise is ORM and Object-Oriented systems). I do appreciate your assistance. Best, Amir.
    d
    • 2
    • 15
  • k

    Krish Narukulla

    02/09/2023, 2:58 AM
    Copy code
    WARNING: An illegal reflective access operation has occurred
    WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/opt/flink/lib/flink-dist-1.16.1.jar) to field java.lang.String.value
    WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
    WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
    WARNING: All illegal access operations will be denied in a future release
    m
    j
    • 3
    • 4
  • s

    Suparn Lele

    02/09/2023, 4:35 AM
    Hi, about 2-3 weeks back I had started a thread regarding flink batch jobs that use multiple pipelines. We are able to achieve that using the ways suggested by @Weijie Guo. Just a follow up question on that. Just to give a context. I have around 30 tables sitting in postgresql database. I am running a loop. And for every iteration in that loop, I am 1. Fetching data from source postgresql table 2. Doing aggregations 3. Storing results in destination postgresql table 4. Running the execute command. But what we are observing is that there are 30 pipelines and every pipeline runs after one another. Issue is that the first pipeline gets finished in 90 seconds but it takes same amount of time for subsequent pipelines as well. So it takes almost around 30 pipelines * 90 seconds = 45 minutes to finish this job. Is this a limitation of Apache Flink? Or I am missing something
    w
    • 2
    • 34
  • w

    Wei Qin Pan, Max

    02/09/2023, 6:19 AM
    Hi All, I have a System Function (IF AND CASE WHEN) issue occurred. I using Flink 1.15.0 + with Flink SQL.
    Copy code
    -- when i execute this sql, i can get the correct result
    SELECT IF(`marketing_flow_id` IS NOT NULL, `marketing_flow_id`, '') FROM source;
    
    -- when i execute this sql, i get the error result(return  blank)
    SELECT IF(`marketing_flow_id` IS  NULL, '', `marketing_flow_id`) FROM source;
    maybe this is a bug ?
    • 1
    • 3
  • x

    xiaohe lan

    02/09/2023, 6:45 AM
    Hi, My input stream is
    Tuple2<String, String>
    , I want to group by the first field and sum the integer in the second field. This is my `ProcessFunction`:
    Copy code
    public static class MyKeyedProcessFunction
          extends KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>> {
        private ValueState<Integer> state;
    
        @Override
        public void open(Configuration parameters) throws Exception {
          state = getRuntimeContext().getState(new ValueStateDescriptor<>("sum", Integer.class));
        }
    
        @Override
        public void processElement(
            Tuple2<String, Integer> value,
            Context ctx,
            Collector<Tuple2<String, Integer>> out) throws Exception {
          Integer sum = state.value();
          if (sum == null) {
            sum = 0;
          }
          sum += value.f1;
          state.update(sum);
          ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5000);
        }
    
        @Override
        public void onTimer(
            long timestamp,
            OnTimerContext ctx,
            Collector<Tuple2<String, Integer>> out) throws Exception {
          out.collect(Tuple2.of(ctx.getCurrentKey(), state.value()));
          state.clear();
        }
      }
    Now the
    onTimer
    is called for every element. I specified the input as:
    Copy code
    aaa,50
    aaa,40
    aaa,10
    I see the output like:
    Copy code
    (aaa,100)
    (aaa, null)
    (aaa, null)
    How can I get the output as
    (aaa,100)
    ?
  • a

    Amenreet Singh Sodhi

    02/09/2023, 6:56 AM
    Hi Team, When i am deploying my job in session mode, it is using log4j for logging(i have modified the path for storing the logs) but when i deploy the same job in application mode, it is asking for logback-console.xml. Is logback-console.xml required for deployment in application mode? or its just some configuration issue?
    • 1
    • 1
  • e

    Eugenio Marotti

    02/09/2023, 7:19 AM
    Hi everyone. I'm developing a monitoring app and I want to use Flink to process the event stream. I need to start a timer when an event is received in Flink, send the timer value and stop the timer when another event is received. Let me explain better. An event consists of an event name, a source id and other fields. So I have something like this:
    E1("A",1,...) -> E2("B",1,...) -> E3("C",1,...)
    When I receive event "A" I want to start a timer (keyed by the source id) and update a sink with the timer value periodically. When I receive event "C" I want to stop the timer and update the sink with the final timer value. Is there a way to accomplish that in Apache Flink?
  • m

    Marouane Souadi

    02/09/2023, 10:15 AM
    Hi everyone, for sink data to s3 using orc format, there are 2 classes, flink_orc, and flink_orc_nohive, what does mean the flink_orc_nohive one ?
  • n

    Nitin Agrawal

    02/09/2023, 3:05 PM
    Hello All, As part of flink Jobs in streaming mode there is a requirement to publish additional attribute called
    sequence_number
    the sequence_number should be increasing in nature starting from 1 and keep increasing over lifetime skip of sequence is okay .. In the Database world we generally have this sequence called as
    AUTO_INCREMENT
    . Is there a way in flink to achieve the same.
  • s

    Siddhesh Kalgaonkar

    02/09/2023, 6:18 PM
    Hello #C03G7LJTS2G can we do all the things using Python datastream API like Java datastream API Which one is full fledged one?
    m
    d
    d
    • 4
    • 17
  • e

    Erwin Cabral

    02/09/2023, 7:17 PM
    Hi everyone. I am currently running a streaming pipeline using kafka->flink(beam) via flink k8s operator->clickhouse in k8s. I am using hadoop as my storage dir for checkpoint and savepoints. This setup works well. However, I noticed that the checkpoint size slowly increases after running it for day. The messages being processed is around 200 messages/second with about 2k per message. I set
    state.checkpoints.num-retained: "5"
    hoping that the size won't grow any more beyond a certain size but I am still seeing a increasing trend when I view metrics from grafana. Is there a way to cleanup the checkpoint files which are no longer relevant for checkpoint recovery?
    m
    • 2
    • 1
1...545556...98Latest