https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • s

    Sajjad Rizvi

    01/31/2023, 5:20 PM
    Hello, is there a way to process watermarks in a
    BroadcastConnectedStream
    ? đź§µ
    d
    • 2
    • 6
  • r

    Reme Ajayi

    01/31/2023, 7:42 PM
    Hi everyone, I'm trying to use the
    ConfluentAvroRegistryDeserializationSchema.forSpecific
    . I am using the POJO generated from the
    avro-maven-plugin
    but it seems Flink does not understand the POJO.
    Copy code
    org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Expecting type to be a PojoTypeInfo
    	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
    	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
    	at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98)
    	at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:846)
    	at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:240)
    	at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1090)
    	at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1168)
    	at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
    	at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1168)
    Caused by: java.lang.IllegalStateException: Expecting type to be a PojoTypeInfo
    	at org.apache.flink.formats.avro.typeutils.AvroTypeInfo.generateFieldsFromAvroSchema(AvroTypeInfo.java:72)
    	at org.apache.flink.formats.avro.typeutils.AvroTypeInfo.<init>(AvroTypeInfo.java:55)
    	at org.apache.flink.formats.avro.AvroDeserializationSchema.getProducedType(AvroDeserializationSchema.java:177)
    	at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.getProducedType(KafkaValueOnlyDeserializationSchemaWrapper.java:56)
    	at org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:216)
    	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2634)
    	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:2006)
    	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:1977)
    	at org.flink.test.DataStreamJob.main(DataStreamJob.java:73)
    	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
    Does anyone have any ideas on how to fix this?
    s
    m
    +3
    • 6
    • 25
  • e

    Emmanuel Leroy

    01/31/2023, 7:44 PM
    Throttling a streaming Flink job: how can I do that? I want to be able to limit how fast a job runs to avoid starving other consumers on the kafka streams. How can I achieve this? I’m thinking by limiting the output so that the job build back-pressure and slows down its ingest, but even that I am not sure how to achieve. Any insight from the experts?
    âž• 1
    j
    m
    • 3
    • 26
  • y

    yingzhe dong

    01/31/2023, 10:53 PM
    Hello,
  • y

    yingzhe dong

    01/31/2023, 10:55 PM
    when I downloaded https://dlcdn.apache.org/flink/flink-1.16.0/flink-1.16.0-bin-scala_2.12.tgz and run it using ./bin/start-cluster.sh , an error appeared “Service temporarily unavailable due to an ongoing leader election. Please refresh.” , and I check the log, I found a weird thing. 2023-01-31 175252,458 ERROR akka.remote.EndpointWriter [] - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[akka.tcp://flink@localhost:6123/]] arriving at [akka.tcp://flink@localhost:6123] inbound addresses are [akka.tcp://flink@x86_64-apple-darwin13.4.0:6123]
  • y

    yingzhe dong

    01/31/2023, 10:59 PM
    this is local installation, and my job manager and task manager is running on localhost, so maybe [akka.tcp://flink@localhost:6123] and [akka.tcp://flink@x86_64-apple-darwin13.4.0:6123] should be same, but now it seems redirect to a weird address, and I don’t know what is x86_64-apple-darwin13.4.0:6123 , now when I go to localhost:8081, I only get “Service temporarily unavailable due to an ongoing leader election. Please refresh.” message, and could not access to Flink dashboard.
    m
    • 2
    • 3
  • a

    Ans Fida

    01/31/2023, 11:07 PM
    Anyone have experience using Delta connector with Flink? I’m trying to use the Flink state processor API to read information about the delta table snapshot version in the last flink checkpoint but I’m not sure how to access the Delta checkpoint object. Any pointers will be appreciated!
    đź‘€ 1
    m
    • 2
    • 1
  • n

    Nathanael England

    02/01/2023, 2:10 AM
    A common pattern of broadcast process functions is to use the broadcasted state as a mechanism for holding some kind of rule configuration while the non-broadcasted state for the data to be processed against that rule. If you wanted to generate some kind of acknowledgement when a rule was successfully updated or if it failed as a side output of the broadcast element handling, would you get one or more success or failure messages based on the parallelization of that process function?
  • g

    Guruguha Marur Sreenivasa

    02/01/2023, 4:05 AM
    Hi All, We're seeing errors on the Flink Task Manager when trying to consume from Kafka:
    Copy code
    2023-02-01 03:48:41,522 ERROR org.apache.kafka.common.metrics.Metrics                      [] - Error when removing metric from org.apache.kafka.common.metrics.JmxReporter
    2023-02-01 03:45:28,083 ERROR org.apache.flink.runtime.io.network.netty.PartitionRequestQueue [] - Encountered error while consuming partitions
    Any inputs on why this happens? I believe the JobManager restarts at the same time.
    Copy code
    2023-02-01 03:50:41,399 ERROR org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Registration at JobManager failed due to an error
    Any thoughts are appreciated, thanks!
  • s

    Sumit Aich

    02/01/2023, 5:27 AM
    Hi Team, can you please tell where does the Flink K8s Operator store the specs of the previous deployment when the experimental rollback feature is enabled ?
    đź‘€ 1
    g
    • 2
    • 8
  • g

    Giannis Polyzos

    02/01/2023, 3:28 PM
    Flink Table Store -------------------- Im currently testing out Flink Table Store. When creating a catalog
    Copy code
    CREATE CATALOG tblCatalog WITH (
                    'type'='table-store',
                    'warehouse'='file:/tmp/table_store'
                    'file.format' = 'parquet'
                );
    im trying to use
    parquet
    files instead of
    orc
    following the configs here but seems like the output files are still in orc format. Maybe the project is still quite early in it's days and only orc is supported? and followup question i believe only s3 is supported currently and not gcs, correct?
  • k

    Karane Vieira

    02/01/2023, 4:16 PM
    Hi, I was using flink-operator-1.1.0 but it is not available anymore. I tried install it from repo, but it is not working because the corresponding Container seems not to exist too. Any is familiar with flink-operator versions?
  • y

    Yaroslav Bezruchenko

    02/01/2023, 5:38 PM
    Hey, I'm facing problem with checkpoints. All checkpoints take up to 3-4s, but sometimes checkpoint takes much more and fails. Any ideas how to debug that?
    j
    s
    • 3
    • 16
  • e

    Eric Xiao

    02/01/2023, 10:21 PM
    Is there a way for us to get more insights into RocksDB / ephemeral memory usage? When a TM starts we see a bump in resource usage before the TM stabilizes. We are not really sure what is contributing to this bump in memory usage. Similarly when a TM gets reused (During reactive mode), we noticed the ephemeral memory did not fully go down to 0. We are wondering if we have not configured our RocksDB correctly to flush out the previous key set.
  • s

    soudipta das

    02/01/2023, 11:20 PM
    Hi, Team, I am trying to write a custom aggregate function (extending the
    AggregateFunction
    class)using Table API to group by a set of columns and collect an Array of String in the aggregate function. When the group window emitter tries to emit the result of the aggregation, am getting a class cast exception.
    Copy code
    java.lang.ClassCastException: class java.lang.String cannot be cast to class [Ljava.lang.Object; (java.lang.String and [Ljava.lang.Object; are in module java.base of loader 'bootstrap')
    	at org.apache.flink.table.data.conversion.ArrayObjectArrayConverter.toInternal(ArrayObjectArrayConverter.java:40)
    	at org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
    	at org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:75)
    	at org.apache.flink.table.data.conversion.RowRowConverter.toInternal(RowRowConverter.java:37)
    	at org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:61)
    	at GroupingWindowAggsHandler$173.getValue(Unknown Source)
    	at org.apache.flink.table.runtime.operators.window.AggregateWindowOperator.emitWindowResult(AggregateWindowOperator.java:164)
    	at org.apache.flink.table.runtime.operators.window.WindowOperator.onEventTime(WindowOperator.java:390)
    	at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)
    	at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:180)
    	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:599)
    	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:239)
    	at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:200)
    	at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)
    	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:136)
    	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
    	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
    	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
    	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
    	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
    	at java.base/java.lang.Thread.run(Unknown Source)
    The output of the aggregate is a
    Row
    object with an
    Array<String>
    elements. Any pointers to what I might be doing wrong? Thanks.
  • a

    Ammar Master

    02/02/2023, 1:10 AM
    Hi all, is there any way to make
    FileSource
    consume the paths from a
    DataStream
    ? In my use case I have a DataStream containing paths to files, can I connect that to a FileSource so that I can take advantage of the features like splitting a single file? Or do I need to read the file manually in
    flatMap
    operator?
  • j

    Jirawech Siwawut

    02/02/2023, 2:04 AM
    Hi all. I just wonder is there any example on how to create jar to use in Flink SQL Client.
    Copy code
    ./bin/sql-client.sh --help        
    -j,--jar <JAR file>                   A JAR file to be imported into the
                                        session. The file might contain
                                        user-defined classes needed for the
                                        execution of statements such as
                                        functions, table sources, or sinks.
                                        Can be used multiple times.
    For example myudf.jar
    Copy code
    import org.apache.flink.table.api.*;
    import org.apache.flink.table.functions.ScalarFunction;
    import static org.apache.flink.table.api.Expressions.*;
    
    // define function logic
    public static class SubstringFunction extends ScalarFunction {
      public String eval(String s, Integer begin, Integer end) {
        return s.substring(begin, end);
      }
    }
    
    TableEnvironment env = TableEnvironment.create(...);
    
    // register function
    env.createTemporarySystemFunction("SubstringFunction", SubstringFunction.class);
    r
    • 2
    • 2
  • j

    Janghwan

    02/02/2023, 2:06 AM
    Hi. I’m new to Flink and need help on modeling our data aggregation in Flink API. I’m not sure if this is correct place to ask questions. let me know if there is a better place. I’m trying to maintain the win rate of the gamers of the last 100 games (by event timestamp) . the source event looks like this
    Copy code
    class GameEvent {
      string userID
      int eventType // started, lost, won etc.
      string gameID
      string eventID
      long timestamp
    }
    More generally, We want to maintain each user’s win count of the last 100 games they played. So for the last 100 “started” games, we need to keep “won” event for the matching gameID. It is clear to me that the
    userID
    will be the key, and
    timestamp
    will be the event time field. But I’m not sure how to model this type of window, and aggregation functions to keep last 100 games and matching “won” events. Also there can be duplicated events so it should be deduplicated by
    eventID
    or a combination of
    gameID, eventType
    . It seems I need to use some sort of custom window functions, but i’m not sure what’s provided out of the box, and what i need to implement by myself. I appreciate any help.
  • w

    Wai Chee Yau

    02/02/2023, 3:38 AM
    Im trying the Iceberg integration for Flink. To write to iceberg table i need to convert my datastream from
    DataStream<CustomClass>
    to
    DataStream<Row>
    or
    DataStream<RowData>
    . Is there a function in Flink to automatically do this? I prefer not to manually map it myself because I'll having a lot of different custom classes from Protobuf. Thanks
  • j

    Jirawech Siwawut

    02/02/2023, 3:57 AM
    Hi all. I found this error when i tried to use Flink SQL Client with Kafka
    Copy code
    Caused by: java.lang.RuntimeException: Failed to get metadata for topics [mytopic-v1].
    	at
    	... 7 more
    Caused by: java.util.concurrent.ExecutionException: org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited. Call: describeTopics
    	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
    Here is my command
    Copy code
    ./bin/sql-client.sh -l jars/ 
    |-jars
    |---  flink-connector-kafka-1.16.0.jar
    |---  kafka-clients-3.3.0.jar
  • t

    Tawfik Yasser

    02/02/2023, 6:02 AM
    Hi I'm trying to call keyBy() right after a flatMap() but I'm getting this exception
    Caused by: java.util.ConcurrentModificationException
    w
    • 2
    • 1
  • s

    Sumit Aich

    02/02/2023, 7:22 AM
    Hi Team, how to access the job state of a Flink deployment using the Flink K8s Operator? can it be accessed only through the REST API ?
    k
    • 2
    • 3
  • k

    Kosta Sovaridis

    02/02/2023, 9:10 AM
    Hi, Does anyone knows what could cause:
    Copy code
    [ ERROR ][ test/basic-session-job-example ] Error during event processing ExecutionScope{ resource id:
                                                      ResourceID{name='basic-session-job-example', namespace='test'}, version: 133942880} failed.
    org.apache.flink.kubernetes.operator.exception.ReconciliationException:
      org.apache.flink.util.FlinkRuntimeException:
        Failed to create the dir: /opt/flink/artifacts/test/flink-cluster/basic-session-job-example
        at org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController.reconcile(FlinkSessionJobController.java:107)
    Here is the config I am using:
    Copy code
    apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
    kind: FlinkSessionJob
    metadata:
      name: basic-session-job-example
    spec:
      deploymentName: flink-cluster
      job:
        jarURI: <https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.16.0/flink-examples-streaming_2.12-1.16.0-WindowJoin.jar>
        parallelism: 1
        upgradeMode: stateless
    I have tried many setups with chmod/chown to fix some permission rights that might occur due to openshift but nothing helped.
    g
    • 2
    • 5
  • g

    Gaurav Miglani

    02/02/2023, 11:02 AM
    We have moved to new flink kubernetes operator version 1.3, but somehow jobid of our exisiting flink jobs is not generating as per below logic
    • 1
    • 1
  • n

    Nick Pocock

    02/02/2023, 1:48 PM
    I am using Flink with Stateful functions. I'm trying to understand how our jobs would resume if the Flink and Statefun containers went down within my K8s cluster. I'm setting checkpoints and savepoints using the following config:
    Copy code
    state.checkpoints.dir: file:///flink-data/checkpoints
    execution.checkpointing.interval: 10000 #in milliseconds - must be greater than 10 - this is for internal failure recovery
    state.checkpoints.num-retained: 3
    state.backend.local-recovery: true
    # <https://kubernetes.io/docs/concepts/workloads/controllers/job/>
    state.savepoints.dir: file:///flink-data/savepoints
    I can see checkpoints being saved but I can't work out how savepoints are saved to my persistent K8s volume, is this something I need to do manually?
    k
    • 2
    • 2
  • r

    Richard Noble

    02/02/2023, 1:58 PM
    Hi All! I have a question regarding savepoints in flink SQL. Is there a way to identify a SQL clause as an operator? So that if I recover from a savepoint, that query runs from that point on?
    m
    a
    • 3
    • 14
  • n

    Nitin Agrawal

    02/02/2023, 5:01 PM
    Hi All, I trying to override to the behaviour of dynamoDB sink i.e. to support update on the DDB. While doing so the issue I am observing that after one or two entries in DDB task become Busy i.e. max 100 is achieved and no more data is synced to DDB. the only change carried out is to call UpdateItemAPI instead of BatchWrite API .. I am not able to see any error in the logs. Can you please guide where to look to for errors or something I should look more for
    a
    • 2
    • 2
  • g

    Gaurav Miglani

    02/02/2023, 6:33 PM
    I'm trying kube autoscaler, getting Some errors and warn logs, also job is not able to scale somehow, am i missing something
    • 1
    • 5
  • c

    chankyeong won

    02/02/2023, 11:53 PM
    In Flink Operator’s FlinkDeployment, Can I use
    s3://
    scheme for
    spec.job.jarURI
    ? The error message seems to say that only local:// scheme are available.
    k
    • 2
    • 3
  • k

    Krish Narukulla

    02/03/2023, 7:09 AM
    Question on jvm , do two flink jobs share single jvm in life cycle?
    w
    s
    • 3
    • 2
1...525354...98Latest