https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • o

    Oscar Perez

    12/20/2022, 10:30 AM
    Hi, I m implementing a use case with flink that sends an alert whenever a user logs in with the same device token in the last 3 weeks. Looking at flink documentation I see 2 ways of doing this. 1. with windows 2. with state TTL I have tried doing it with state TTL but I fail to see how to test it. Using thread.sleep or setProcessingTime does not seem to clear the state for previous entries. Regarding state TTL. If I have MapState<String, List<String>>. Do individual entries of the list get cleared when the TTL expires or the TTL is bound to just the keys?
    m
    • 2
    • 4
  • o

    Oscar Perez

    12/20/2022, 1:33 PM
    Hi, it is me again. Trying to understand how to test stateful functions in flink. Looking at the documentation in here: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/testing/ I see that in the method processElement we set a timestamp:
    Copy code
    testHarness.processElement(2L, 100L);
    This pushes the element with processing time 100L ? Then I see that the processWatermark method is used but this seems to refer to event time timers. Is this needed in this case?
    Copy code
    //trigger event time timers by advancing the event time of the operator with a watermark
            testHarness.processWatermark(100L);
    In many examples I see that we exclusively use processWaterMark method and not setProcessingtime like in here: https://github.com/apache/flink/blob/master/flink-streaming-java/src/test/java/org[…]k/streaming/runtime/operators/windowing/WindowOperatorTest.java I am a bit confused on how to glue all these methods together and on what basis to use one or another. Thank you
  • k

    kingsathurthi

    12/20/2022, 2:05 PM
    Im using operator for deployment and I want to expose a service for task manager how do i achive it. by default job manager rpc , blob and UI service is exposed
  • s

    Sergio Morales

    12/20/2022, 3:09 PM
    Hi, I’m using the
    FlinkKafkaProducer011
    from
    Copy code
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
        <version>1.11.6</version>
    </dependency>
    Now, when executing it fails at runtime because
    Copy code
    switched from INITIALIZING to FAILED with failure cause: java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/collect/Lists
    Checking the code it still references to guava18, but other classes/dependencies seem to refer to guava30 instead, is it a bug or do you have any advice for this case?
    m
    • 2
    • 13
  • t

    Tsering

    12/20/2022, 3:13 PM
    hi, i am trying to read file from s3 as a broadcast stream in my flink app and it is throwing me this kind of error
    Copy code
    Exception type is USER from filter results [UserClassLoaderExceptionFilter -> NONE, UserAPIExceptionFilter -> NONE, UserSerializationExceptionFilter -> USER, UserFunctionExceptionFilter -> SKIPPED, OutOfMemoryExceptionFilter -> NONE, TooManyOpenFilesExceptionFilter -> NONE, KinesisServiceExceptionFilter -> NONE].
    can someone please enlight on this reason or where thing get wrong ?
  • j

    Jason Politis

    12/20/2022, 5:46 PM
    gm all, trying to build flink 1.15.1 but i'm getting this error:
    Copy code
    [ERROR] Failed to execute goal on project flink-connector-hive_2.12: Could not resolve dependencies for project org.apache.flink:flink-connector-hive_2.12:jar:1.15.1: Failed to collect dependencies at org.apache.hive:hive-exec:jar:2.3.9 -> org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde: Failed to read artifact descriptor for org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde: Could not transfer artifact org.pentaho:pentaho-aggdesigner-algorithm:pom:5.1.5-jhyde from/to maven-default-http-blocker (<http://0.0.0.0/>): Blocked mirror for repositories: [<http://repository.jboss.org|repository.jboss.org> (<http://repository.jboss.org/nexus/content/groups/public/>, default, disabled), conjars (<http://conjars.org/repo>, default, releases+snapshots), apache.snapshots (<http://repository.apache.org/snapshots>, default, snapshots)] -> [Help 1]
    org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal on project flink-connector-hive_2.12: Could not resolve dependencies for project org.apache.flink:flink-connector-hive_2.12:jar:1.15.1:Failed to collect dependencies at org.apache.hive:hive-exec:jar:2.3.9 -> org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde
    Is anyone else getting this?
    m
    • 2
    • 7
  • s

    Sylvia Lin

    12/20/2022, 11:30 PM
    For Flink on k8s, instead of creating a nodePort, can I create an ingress for queryable states? https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/#enabling-queryable-state
    e
    e
    • 3
    • 6
  • r

    raghav tandon

    12/21/2022, 5:24 AM
    Can someone help how to solve this exception in FlinkKafkaConsumer, I dont find a way to retry this… Its never able to commit again, if once this error starts Although i understand Flink doesnt rely on kafka offset but still i want to see the lags…
    Copy code
    org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
    • 1
    • 1
  • c

    chankyeong won

    12/21/2022, 5:58 AM
    I’m testing Flink DataStream API for Kafka input source, using KafkaSource. I want to use
    io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
    for Schema Registry properties because I have multiple schema avro messages in same topic. But ERROR
    java.lang.IllegalStateException: Expecting type to be a PojoTypeInfo
    is occur. Since I can’t stick to one schema, I used the SpecificRecordBase type. How can I deserialize avro messages in my use case?
    m
    r
    • 3
    • 16
  • u

    习羞羞

    12/21/2022, 6:06 AM
    #C03G7LJTS2G Hi All, I am now using the flink operator tor run batch jobs, I wonder is there a mechanism that can inform me automatically when the job’s done , or a polling interface can do as well.
    e
    g
    • 3
    • 20
  • s

    Slackbot

    12/21/2022, 6:22 AM
    This message was deleted.
    m
    • 2
    • 4
  • k

    kingsathurthi

    12/21/2022, 7:06 AM
    how service are automatically creating in flink operator? seems there is no way to expose other ports? does any one came across this ?
  • a

    Alexis Josephides (Contractor)

    12/21/2022, 9:58 AM
    👋 I have myself in a muddle and I wondered if someone could help unravel me. This is in relation to how the RocksDb state works. In flight state goes through a series of steps. It is first written to Mem tables in the off-heap memory (managed memory). It continues to write to these Mem tables until they hit a max size. At which point these are then made read-only mem tables. Once these mem tables are all full (and there maybe another trigger) these are then flushed to local disk - in this case the RocksDb. Once on RocksDb these are then compacted. All reads during in-flight are favoured to read from the Mem tables. If a value can’t be found there then it will fallback to a table cache (also off heap) that will be based on the local disk (rocksdb reads). Have I understood correctly? If the above is true does this mean that until 100% managed memory is reached RocksDb itself is unused? Would checkpoint sizes increase also until they matched the size of managed memory?
    v
    d
    • 3
    • 6
  • g

    Gerald Schmidt

    12/21/2022, 11:51 AM
    I'm trying to spin up Flink in a constrained Kubernetes environment, which means that I can't use the "native Kubernetes" chart (with CRDs, webhook and all). I'm building a custom chart based on the common resource definitions in the docs. Before I get too deep into any issues - am I overlooking a chart that creates a standalone Flink cluster on top of Kubernetes? Thanks!
    g
    • 2
    • 6
  • t

    Tony Yeung

    12/21/2022, 11:57 AM
    👋 Hi all. May I know if application mode with standalone deployment can achieve HA? According to the doc, a JobManager is started for each submission. There is 1 JobManager per application. I afraid if single point of failure will occur as there is no standby JobManager
    o
    m
    m
    • 4
    • 22
  • o

    Otto Remse

    12/21/2022, 2:16 PM
    Hi! in flink SQL Given the following example table
    Copy code
    country string,
    address string
    if I do a distinct collect of addresses grouped by country, I get a multiset back. Is there a way without UDFs to get an array back?
    Copy code
    select country, COLLECT(DISTINCT address) AS addresses from myTable group by country
  • e

    Emmanuel Leroy

    12/21/2022, 4:39 PM
    my Flink Session cluster is in a crash loop and I am not even able to delete the deployment. What’s the process to clean this up?
    • 1
    • 1
  • r

    Rishabh Kedia

    12/21/2022, 11:49 PM
    @Gyula Fóra @Maximilian Michels thank you for adding autoscaling to flink kube operator. I see that it is not a part of latest 1.3 version, is there any plans to include this soon? We have a business need and i would love to continue to use/test this rather than build my own autoscaler. Im happy to help too
    m
    g
    • 3
    • 5
  • s

    Sai Sharath Dandi

    12/22/2022, 1:14 AM
    Hello, Does anyone know if this is supposed to be a valid SQL syntax?
    Copy code
    select ROW(1 as a) as b from myTable
    s
    • 2
    • 20
  • w

    Walker L

    12/22/2022, 1:46 AM
    Hi, all, when I tried to run flink1.14.4 with occlum, the following error occurred:
    Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.
    An example of Occlum running flink1.11 (only run taskmanager in occlum) can be found here: https://github.com/occlum/occlum/tree/v0.29.3/demos/cluster_serving; I just changed flink1.11 to flink1.14. I tried to increase the value of parameters jobmanager.memory.process.size, taskmanager.memory.process.size, etc., but I still got the error. Since I am also new to flink, I would like to ask if there are other settings that can solve the above error?
    m
    m
    • 3
    • 3
  • t

    Tsering

    12/22/2022, 2:57 AM
    hello, my flink app works in the local intelliJ when in the cloud it hit with this error
    Copy code
    java.io.IOException: Could not perform checkpoint 66 for operator Combine Broadcasted Rules with Events -> Timestamps/Watermarks (1/6)#65.
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1055)
    at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135)
    at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250)
    at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431)
    at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61)
    at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227)
    at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
    at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:98)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:784)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571)
    at java.base/java.lang.Thread.run(Thread.java:829)
    Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 66 for operator Combine Broadcasted Rules with Events -> Timestamps/Watermarks (1/6)#65. Failure reason: Checkpoint was declined.
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:264)
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:169)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:706)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:627)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:590)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:312)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1099)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1083)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1039)
    ... 19 more
    Caused by: java.lang.ClassCastException: class com.xxx.RateRule cannot be cast to class java.lang.String (com.xxx.RateRule is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @6165164; java.lang.String is in module java.base of loader 'bootstrap')
    at org.apache.flink.api.common.typeutils.base.StringSerializer.copy(StringSerializer.java:31)
    at org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:110)
    at org.apache.flink.runtime.state.HeapBroadcastState.<init>(HeapBroadcastState.java:69)
    at org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:84)
    at org.apache.flink.runtime.state.HeapBroadcastState.deepCopy(HeapBroadcastState.java:40)
    at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(DefaultOperatorStateBackendSnapshotStrategy.java:88)
    at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.syncPrepareResources(DefaultOperatorStateBackendSnapshotStrategy.java:36)
    at org.apache.flink.runtime.state.SnapshotStrategyRunner.snapshot(SnapshotStrategyRunner.java:77)
    at org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:230)
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:226)
    ... 29 more
    to simplify, it was caused by ClassCastException, can I know how to resolve this issue? i been stuck here for 3 days already 🙏
    m
    d
    • 3
    • 16
  • a

    Abdelhakim Bendjabeur

    12/22/2022, 11:07 AM
    Hello, I am trying to understand how JOIN work in Flink. I have two kafka topics that I want to join and compute some metrics on the fly using some window functions. I have the following error
    Copy code
    [ERROR] Could not execute SQL statement. Reason:
    org.apache.flink.table.api.TableException: StreamPhysicalOverAggregate doesn't support consuming update and delete changes which is produced by node ChangelogNormalize(key=[id, accountId])
    I did a simple join without any window functions, and it works "fine". I checked the changelog of the resulting table (when simpling joining), and it does include updates and delete operations
    -D
    ), even though my topics are append-only by design. Is this a normal behaviour in Flink? How can I use window functions such as
    Copy code
    ROW_NUMBER() OVER (PARTITION BY accountId, ticketId ORDER BY ticketMessageCreatedDatetime ASC) AS rownum,
         COUNT(*) OVER (PARTITION BY accountId, ticketId) AS count_messages
    on a joined stream?
    m
    r
    • 3
    • 4
  • t

    Tiansu Yu

    12/22/2022, 11:47 AM
    Encountered this problem while using
    someStream.sinkTo(FileSink.from…)
    java.lang.NoSuchMethodError: 'org.apache.flink.streaming.api.datastream.DataStreamSink org.apache.flink.streaming.api.datastream.DataStream.sinkTo(org.apache.flink.api.connector.sink2.Sink)'
    Could be some dependency issue?
    d
    m
    • 3
    • 15
  • s

    Suriya Krishna Mariappan

    12/22/2022, 11:57 AM
    We are trying out the Kubernetes operator. We came across a configuration where HA can be configured for the operator itself. Before we try that, we tried to delete the operator pod and Job manager pod immediately after that. Everything seems to be come up fine after a few seconds. In what case is HA for the operator useful ? Are we missing something ? Clarification on this would be very helpful.
    t
    g
    • 3
    • 15
  • o

    Oscar Perez

    12/22/2022, 1:10 PM
    Hei, I have created a ProcessWindowFunction and now I would like to test it. I want to test that the sliding window collects all events within the time specified. i.e. 2 seconds. This is the configuration for the sliding window:
    Copy code
    SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2)
    I want to insert events at times from 0 to 4 seconds and I dont want the ones being inserted after 2 seconds to be processed in my first window, for that I do the following:
    Copy code
    testHarness.processElement(event1, 1000 );
    testHarness.processElement(event2, 1500 );
    testHarness.processElement(event3, 1600 );
    testHarness.processElement(event4, 1700 );
    testHarness.processElement(event5, 1800 );
    testHarness.processElement(event6, 2000 );
    testHarness.processElement(event7, 3000 );
    testHarness.processElement(event8, 3100 );
    testHarness.processElement(event9, 3500 );
    testHarness.processWatermark(2001); //Set event time in order to trigger the window
    testHarness.setProcessingTime(2001);
    According to my understanding setting processing time and watermark to 2001 will trigger the window but only 6 events will be processed right? the event7 having the timestamp at 3 seconds will not be consumed. The problem that I am facing is that all events up to event9 are being processed even though I have set up processing time just 2.001 seconds. What am I doing wrong? Thanks!
  • s

    Suparn Lele

    12/22/2022, 3:32 PM
    Hi, I have asked this questions to multiple people and multiple forum, still I didnt get answer. Could someone please help? Basically I am running a flink batch job. My requirement is following 1. I have 10 tables having raw data in postgresql 2. I want to aggregate that data by creating a tumble window of 10 minutes 3. I need to store the aggregated data into aggregated postgresql tables I have list of config which mentions the load query, aggregation query and destination table. I loop through every config, load the data using table APIs, aggregate the data using flink sql, convert the table into datastream and then store the data into tables. My pseudo code somewhat looks like this initialize flink environment load all the configs from file configs.foreach( load data from table aggregate store data delete temporary views created ) streamExecutionEnvironment.execute() Everything works fine. The only problem is that I think with this approach all the load functions would be executed simultaneously. So it would put load on flink right as all data is getting loaded simultaneously?? Or my understanding is wrong?? please guide
    d
    • 2
    • 4
  • j

    Jason Politis

    12/22/2022, 6:15 PM
    With sql cli, how do i remove a property from a table? I can change it with alter ... set statement, but how about unset?
    • 1
    • 1
  • c

    Claudia Kesslau

    12/23/2022, 9:58 AM
    Hi, I'm quite new to flink and I'm trying to use it to write data stored in kafka topics into iceberg tables. I therefore created a job for each combination of kafka topic and iceberg table:
    Copy code
    DataStream<Row> inputStream = StreamExecutionEnvironment.getExecutionEnvironment()
                .fromSource(getKafkaSource(flinkJob.getKafkaTopic(),                                             
                evironment.getKafkaBootstrapUrl()), WatermarkStrategy.noWatermarks(), "kafka")
                .map(flinkJob.getMapper());
    
    FlinkSink.forRow(inputStream, FlinkSchemaUtil.toSchema(flinkJob.getSchema()))
                .tableLoader(getIcebergTableLoader(environment.getIcebergMetaStoreUrl(), 
                             flinkJob.getIcebergTable()))
                .append();
    
    env.execute(job);
    If those jobs are run individually, they run just fine. If multiple jobs run simultaneously on the cluster (1 jobmanager, 3 taskmanagers, 12 taskslots run on k8s) some of the jobs (most times the one with the most columns and data rate) get the following error and fails to create any successful checkpoints:
    Copy code
    com.esotericsoftware.kryo.KryoException: java.lang.ArrayStoreException
    See thread for the full stacktrace. I had no luck searching for this error online. Neither adding more resources to the cluster nor changing the checkpoint interval had any effect on the error. Can you shed some light on the problem? Is this a problem with cluster configuration/resources or the flink job itself?
    • 1
    • 1
  • e

    Emmanuel Leroy

    12/23/2022, 5:56 PM
    I note that in 1.14.6 there were stats on the number of event sent to sink, but in 1.16, it’s always 0 now. Is this intended?
  • u

    饶俊

    12/24/2022, 3:22 PM
    DataStream API how custome cumuldate window assigner?
1...424344...98Latest