https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • e

    Erik Wickstrom

    07/14/2022, 11:05 PM
    I’m trying to use Flink Python with Docker (following https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#using-flink-python-on-docker) and getting errors about JDK paths missing. I’m following the docs to the letter. Anyone else run into this?
    Copy code
    Collecting pemja==0.1.4
      Downloading pemja-0.1.4.tar.gz (32 kB)
        ERROR: Command errored out with exit status 255:
         command: /usr/local/bin/python -c 'import sys, setuptools, tokenize; sys.argv[0] = '"'"'/tmp/pip-install-6banf8rj/pemja/setup.py'"'"'; __file__='"'"'/tmp/pip-install-6banf8rj/pemja/setup.py'"'"';f=getattr(tokenize, '"'"'open'"'"', open)(__file__);code=f.read().replace('"'"'\r\n'"'"', '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' egg_info --egg-base /tmp/pip-pip-egg-info-0n4kuxu1
             cwd: /tmp/pip-install-6banf8rj/pemja/
        Complete output (1 lines):
        Include folder should be at '/usr/local/openjdk-11/include' but doesn't exist. Please check you've installed the JDK properly.
        ----------------------------------------
    ERROR: Command errored out with exit status 255: python setup.py egg_info Check the logs for full command output.
    n
    m
    • 3
    • 3
  • h

    Hilmi Al Fatih

    07/15/2022, 2:27 AM
    Kafka Producer Records Send Rate is halved of the consumed rate when deliveryGuarantee is EXACTLY_ONCE Hi everyone. I am currently developing a very simple Kafka to Kafka pipeline. It basically just taking input and pipeline it somewhere to output topics. Setup: • flink version: 1.14.4 • TM: 1 • slot per TM: 1 • checkpoint-ms: 5000 On the input topic, I uses a datagenerator with 2rps. I compare the following metrics (promQL): •
    flink_taskmanager_job_task_operator_KafkaSourceReader_KafkaConsumer_records_consumed_rate
    (flink metrics) •
    flink_taskmanager_job_task_operator_KafkaProducer_record_send_rate
    (flink metrics) •
    irate(kafka_server_brokertopicmetrics_messagesin_topic_total{topic=~"source-topic"}[5m])
    (kafka broker metrics) •
    irate(kafka_server_brokertopicmetrics_messagesin_topic_total{topic=~"sink-topic"}[5m])
    (kafka broker metrics) With AT_LEAST_ONCE, the metrics seems working well as follows. • 1 pic (flink metrics) • 2 pic (kafka broker metrics - left: total, right: rate) With EXACTLY_ONCE, the producer metrics is halved. • 3 pic (flink metrics) • 4 pic (kafka broker metrics - left: total, right: rate) for exactly once, I was afraid if there is some possibility of dataloss, but no dataloss seems to happen (pic 5, left: source topic, right: sink topic). So I am wondering where did I do wrong.
    a
    • 2
    • 4
  • f

    Fred Wu

    07/15/2022, 3:09 AM
    Hi guys, I try to run an aggregate after the Sliding Window Join operation, the goal is getting an Iterable on all elements within each window after applying the JoinFunction. Can anyone suggesting how to achieve that? Thanks!
    c
    • 2
    • 7
  • t

    tega

    07/15/2022, 7:57 AM
    Hi everyone. I’m trying to understand how sub task work. I’m currently running flink in BATCH mode and I have a Kafka source and a JDBC sink. I have 8 partitions on the Kafka side and I enabled a parallelism of 8 on flink. When the job starts, all the 8 sub tasks are created and they all go to the SCHEDULED stage. My problem is this: only one sub task is In the RUNNING stage at a time; the rest are in SCHEDULED stage. Why can’t they all be in RUNNING stage so they can all process all the data in parallel? Is this a thing with the BATCH execution mode?? Thanks!
    c
    l
    • 3
    • 4
  • r

    Roman Bohdan

    07/15/2022, 9:56 AM
    Hello guys, can you please answer my question 🥺 Can we save state after redeploying taskmanager? cuz we are losing it after deploy of a new version of project and we would like to save it somehow, thanks for future answer.
    m
    a
    • 3
    • 6
  • p

    Prasanth Kothuri

    07/15/2022, 10:03 AM
    Hello There, We currently have the following in the scala flink jobs
    Copy code
    import org.apache.logging.log4j.scala.Logging
    logger.debug("aggregations - start")
    however when I run this on kubernetes flink deployment, task manager pod logs doesn't show these messages? any ideas?
    c
    • 2
    • 11
  • r

    Roman Bohdan

    07/15/2022, 10:24 AM
    one small question, what will be the best algorithm to implement graph with sending generic records between states. we`re using iterations now, but it needs a lot of memory and it`s expensive. how can we improve this process, what algorithm can you suggest to make our program fast and with the less memory usages? i mean find best correlation between that two values.
  • l

    Liang

    07/15/2022, 5:17 PM
    One quick question regarding checkpoints vs savepoints on rescaling. In the doc for Flink 1.14 regarding "Difference to Savepoints", it says
    Checkpoints have a few differences from savepoints. They
    • use a state backend specific (low-level) data format, may be incremental.
    • do not support Flink specific features like rescaling.
    However, in 1.15 and later docs Checkpoints vs. Savepoints, in the table, rescaling was supported by both checkpoints and savepoints. Can someone please clarify which statement is correct? Or does that mean in version 1.15 and beyond, the checkpoint can start to support rescaling? Thanks!
    👀 1
    d
    • 2
    • 1
  • m

    Mehul Batra

    07/16/2022, 6:31 PM
    one quick question, I tried using flink 1.13 elasticsearch sql connector to send data to elastic search indexes, but my flink job is not able to create connection with the cluster, using username and password, but on the other hand kafka connect using the same credentials is able to send data to elasticsearch, any inputs for that?
    a
    • 2
    • 2
  • e

    Echo Lee

    07/18/2022, 12:27 AM
    Hello guys, I recently found out that the new kafka source api is 10 times more performant than the old one, but don't know what's causing it. Two ways to use the same data of the same topic. Flink version: 1.14.x Single data size: 1k
    Copy code
    // new source api
    KafkaSource<String> source = KafkaSource.<String>builder()
                    .setBootstrapServers(brokers)
                    .setTopics("benchmark")
                    .setGroupId("my-group")
                    .setStartingOffsets(OffsetsInitializer.earliest())
                    .setBounded(OffsetsInitializer.latest())
                    .setValueOnlyDeserializer(new SimpleStringSchema())
                    .build(); 
    
    env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
            .map(new MapFunction<String, Object>() {
                private int count = 0;
                private long lastTime = System.currentTimeMillis();
                         
                      @Override
                public Object map(String value) throws Exception {
                    count++;
                    if (count % 100000 == 0) {
                        long currentTime = System.currentTimeMillis();
                        System.out.println(currentTime - lastTime);                                
                        lastTime = currentTime;              
                          }
                    return null;
                }
            });
    Copy code
    // old source api
    FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<String>("benchmark",
                    new SimpleStringSchema(), properties);
    flinkKafkaConsumer.setStartFromEarliest(); 
    env.addSource(flinkKafkaConsumer)
            .map(new MapFunction<String, Object>() {
                private int count = 0;            
                      private long lastTime = System.currentTimeMillis();
              
                      @Override
                public Object map(String value) throws Exception {
                    count++;
                    if (count % 100000 == 0) {
                        long currentTime = System.currentTimeMillis();
                        System.out.println(currentTime - lastTime);                                
                        lastTime = currentTime;              
                           }
                    return null;
                }
            });
    m
    q
    s
    • 4
    • 13
  • h

    haim ari

    07/18/2022, 9:23 AM
    Hello, I’m using Operator 1.0.1. Ingress does not seem to work (not created). it worked before and I was able to see that it was created. For some reason it stopped working. I followed the documentation and set up:
    Copy code
    spec:
      image: flink:1.14.3
      flinkVersion: v1_14
      ingress:
        template: "<http://flink.k8s.io/{{namespace}}/{{name}}(/%7C$)(.*)|flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)>"
        className: "nginx"
        annotations:
          <http://nginx.ingress.kubernetes.io/rewrite-target|nginx.ingress.kubernetes.io/rewrite-target>: "/$2"
    however no ingress is created, also no events shown in namespaces or errors in operator logs. I tried to remove both the operator and the flink app and recreate them, but ingress still not created. Can someone advise on this ?
    m
    g
    • 3
    • 70
  • f

    Felix Angell

    07/18/2022, 9:31 AM
    Hi all, Can anyone please help me out with a Flink/PyFlink question - I'm curious to know why the docs for PyFlink (1.13) specify that the KafkaClient of 0.10.x is used here -- can't link to the line so cmd+f for
    FlinkKafkaConsumer
    .
    This is odd to me since afaik we cannot use SASL_SCRAM authentication to connect to our Kafka cluster with such an old version of the kafka client api. Raw link: https://nightlies.apache.org/flink/flink-docs-release-1.13/api/python/_modules/pyflink/datastream/connectors.html
    m
    • 2
    • 17
  • s

    salvalcantara

    07/18/2022, 11:12 AM
    Going through the docs for defining custom sources/sinks here:
    Copy code
    If you want to develop a connector that needs to bridge with DataStream APIs (i.e. if you want to adapt a DataStream connector to the Table API), you need to add this dependency:
    
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java-bridge</artifactId>
        <version>1.16-SNAPSHOT</version>
        <scope>provided</scope>
    </dependency>
    Then, the docs don't mention this library anymore / provide any further examples. Can someone point me to actual examples using
    flink-table-api-java-bridge
    for bridging existing DataStream API based connector for SQL purposes?
    ✅ 1
    a
    • 2
    • 2
  • s

    salvalcantara

    07/18/2022, 11:15 AM
    On a separate note, if I have a
    Source<A>
    (
    Sink<A>
    ), what would be the simplest way of obtaining a
    Source<B>
    (
    Sink<B>
    ) based on a mapping/conversion function
    f:A->B
    (
    f:B->A
    )?
    • 1
    • 3
  • b

    Bastien DINE

    07/18/2022, 1:48 PM
    Hi, we have some trouble with the new KafkaSink API, our jobs are restarting every ~10 min because of this exception :
    Copy code
    TASK switched from RUNNING to FAILED with failure cause: org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka test-1@-1 with FlinkKafkaInternalProducer{transactionalId='test-=false} because of a bug in the Kafka broker (KAFKA-9310). Please upgrade to Kafka 2.5+. If you are running with concurrent checkpoints, you also may want to try without them.
    To avoid data loss, the application will restart.
    	at org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.throwException(KafkaWriter.java:405)
    	at org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.lambda$onCompletion$0(KafkaWriter.java:391)
    	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    	at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
    	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353)
    	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
    	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
    	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
    	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
    	at java.base/java.lang.Thread.run(Unknown Source)
    Caused by: org.apache.kafka.common.errors.UnknownProducerIdException: This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerId are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception.
    Our broker version is pretty old (1.1.1) (yet kafka-9310 has "affect version" = 2.4.0 & resolved = 2.4.1) But, this was working well with the old producer API, Is there a workaround with this bug ? Sadly, i can not upgrade my kafka broker right now,
    m
    s
    • 3
    • 12
  • j

    jiangchao qian

    07/19/2022, 4:12 AM
    hello,everyone!how can set numbers of taskmanager and slots.if parallelism is 20,and how can i set,eg: 5 taskmanager(4 slots) or 4 taskmanager(5 slots) ,I look forward to getting good ideas!
    l
    • 2
    • 3
  • h

    Haim Ari

    07/19/2022, 7:00 AM
    Hello, I’m running a flink app with Operator 1.0.1 In the Web UI I don’t see an option to kill the app. Also, another related question to Web UI, Am I right to understand that each app has it’s own UI with the operator ? Meaning there can’t be a single UI showing multiple apps ?
    m
    h
    j
    • 4
    • 29
  • m

    Márk Bartos

    07/19/2022, 8:30 AM
    Hi. Any idea why my flinkShadowJar deps are not found? (only implmenetation)? https://pastebin.com/Ycm2iKss | gradle 8
    c
    • 2
    • 8
  • p

    Prasanth Kothuri

    07/19/2022, 10:30 AM
    We are running flink in standalone k8s session mode, currently all the jobs use the same log4j.properties, how do I specify/inject/configure different log4j.properties files for each job
    c
    • 2
    • 3
  • z

    Zhiqiang Du

    07/19/2022, 11:50 AM
    Hi guys. I have a simple job like this: Input Source: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    CustomProcessAllWindowFunction
    extract first and last element into a tuple2
    CountEvictor
    retain the last element of current window after do function. Expected Result: (1, 4)(4, 8)(8,10) Actual Result: (1, 4)(5, 8)(9,10)
    Copy code
    env.addSource(new MockSource())
            .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(3)))
            .evictor(CountEvictor.of(1, true))
            .process(new CustomProcessAllWindowFunction())
            .addSink(new PrintSinkFunction<Tuple2<Long, Long>>())
    
    env.execute()
    So did I misunderstand the function of Evictor? How can i get the expected output ? And if i override the default trigger of tumbling window. I got expected output.
    Copy code
    env.addSource(new MockSource())
            .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(3)))
            .trigger(CountTrigger.of(5))
            .evictor(CountEvictor.of(1, true))
            .process(new CustomProcessAllWindowFunction())
            .addSink(new PrintSinkFunction<Tuple2<Long, Long>>())
    
    env.execute()
    //output 
    Emit: 1
    Emit: 2
    Emit: 3
    Emit: 4
    Emit: 5
    (1,5)
    Emit: 6
    Emit: 7
    Emit: 8
    Emit: 9
    Emit: 10
    (5,10)
  • s

    Slackbot

    07/19/2022, 11:55 AM
    This message was deleted.
    c
    • 2
    • 1
  • f

    Felix Angell

    07/19/2022, 2:44 PM
    What exactly does the pyModule flag mean ?
    Python module with the program entry point. This option must be used in conjunction with
    --pyFiles
    .
    Is this related to loading a python module directly as a program vs just specifying the files of a pyflink program (using python modules) via pyFiles?
    x
    • 2
    • 2
  • b

    Beny Chernyak

    07/19/2022, 3:04 PM
    Hi guys, I do upgrade from flink 1.11 to 1.15.1 writing .gz files on S3. Since connectors.fs is now obsolete, I have to change the sink I've used before: EventuallyConsistentBucketingSink
    Copy code
    <https://github.com/sjwiesman/flink/blob/83a6400e2587b067d08a64bc7e10edd4b57e71b4/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/consistent/EventuallyConsistentBucketingSink.java>
    to something new, so I use the FileSink:
    Copy code
    return FileSink.forRowFormat(
                    new Path(outputBasePath), new Encoder<T>() {
                        @Override
                        public void encode(T record, OutputStream stream)
                            throws IOException {
                            GzipParameters params = new GzipParameters();
                            params.setCompressionLevel(Deflater.BEST_COMPRESSION);
                            GzipCompressorOutputStream out = new GzipCompressorOutputStream(stream, params);
                            OBJECT_MAPPER.writeValue(out, record);
                            out.finish();
                        }
    
                    })
                .withBucketAssigner(new BasePathBucketAssigner<>())
                .withRollingPolicy(OnCheckpointRollingPolicy.build())
                .build();
    where outputBasePath is some s3:// URI. I got this error:
    Copy code
    Exception in thread "main" java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS
    	at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:60)
    	at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:215)
    	at org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.createBucketWriter(FileSink.java:475)
    	at org.apache.flink.connector.file.sink.FileSink$RowFormatBuilder.getCommittableSerializer(FileSink.java:466)
    	at org.apache.flink.connector.file.sink.FileSink.getCommittableSerializer(FileSink.java:175)
    	at org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo.toString(CommittableMessageTypeInfo.java:120)
    Could someone please explain what I do wrong?
    s
    m
    • 3
    • 5
  • j

    Jérôme Viveret

    07/19/2022, 4:51 PM
    Hello everyone, Would someone know what would be the equivalent of TableAPI's OVER window aggregations in DataStream API ? (and is there one) I understand I would need to write my own window assigner but I would like to ensure I am not missing anything!
    e
    • 2
    • 1
  • x

    Xinbin Huang

    07/19/2022, 6:06 PM
    Have a question about the state recovery for the new source API (FLIP-27). If a split is discovered as "removed" from the enumerator, • would the source reader recognize this removed automatically because splits are sent from the enumerator to the reader? • or the reader would still see these removed splits because reader is getting those splits from the state? For example, if Kafka source is subscribing to a list of topics [via regex], and later on remove one topic from the subscribed list. Would the source reader still read from these removed topics?
    a
    • 2
    • 5
  • b

    Bhupendra Yadav

    07/20/2022, 8:01 AM
    Hello Everyone, Has anyone integrated Flink with opentelemetry? I couldn't find any articles on Google also. I tried integrating OTEl using the guide https://github.com/open-telemetry/opentelemetry-java-instrumentation#getting-started but traces are not being generated. Does flink support OTEl?
    m
    e
    • 3
    • 5
  • g

    George Chen

    07/21/2022, 12:00 AM
    QUES: What is the best practice of using ObjectMapper in AbstractDeserializationSchema?
    Copy code
    public class KinesisEMFJsonDeserializationSchema extends AbstractDeserializationSchema<...> {
    
        private final ObjectMapper objectMapper;
    
        ....
    
        @Override
        public RootNode deserialize(final byte[] input) {
            try {
                return objectMapper.readValue(input, RootNode.class);
            } catch (final IOException e) {
                ...
            }
        }
    }
    This DeserializationSchema is complained by Flink to be not serializable as object Mapper is not serializable I guess?
    ✅ 2
    s
    c
    • 3
    • 11
  • g

    George Chen

    07/21/2022, 12:16 AM
    Another qq - in running my shadowJar for Flink I got this error:
    Copy code
    Caused by: java.lang.NoClassDefFoundError: org/apache/avro/generic/GenericData$Array
    at org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils.addAvroGenericDataArrayRegistration(AvroKryoSerializerUtils.java:68)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.buildKryoRegistrations(KryoSerializer.java:591)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.<init>(KryoSerializer.java:138)
    at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:91)
    Is there anything I might miss in flinkShadowJar dependency?
    c
    • 2
    • 13
  • j

    jiangchao qian

    07/21/2022, 2:02 AM
    Hello everyone,I am trouble with GroupWindowAggregate.the sql like :"GROUP BY TUMBLE(rowtime, INTERVAL '1' MINUTE(4)),user" . I have two question: Is "Group By User" the same to " DataStream KeyBy User" ? the rowtime is eventtime ,I construct different event times per user,now I have three users like "user_a,user_b,user_c", I think I will get three watermark:"watermark_a,watermark_b,watermark_c" ,but i only get the one watermark (max(eventtime_a,eventtime_b,eventtime_c)) in flinkUI, I'm so confused and hope to get useful advice?
    m
    • 2
    • 5
  • j

    Jérôme Viveret

    07/21/2022, 3:04 PM
    I would like to insist on https://issues.apache.org/jira/browse/FLINK-24907 (quite annoying to have late data discarded without being able to monitor). Would there be a process to raise attention on it ?
    g
    m
    • 3
    • 10
1...789...98Latest