https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • a

    Aviv Dozorets

    08/28/2022, 8:48 AM
    Running multiple different services, all run with Flink 14.3, checkpointing to s3 buckets. Generally checkpoint size is less than 500KB, but lately on different one started to see errors like this one:
    Copy code
    java.lang.reflect.UndeclaredThrowableException
    	at com.sun.proxy.$Proxy40.submitTask(Unknown Source)
    	at org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:60)
    	at org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$4(Execution.java:589)
    	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    	at java.lang.Thread.run(Thread.java:748)
    Caused by: java.io.IOException: The rpc invocation size 39076205 exceeds the maximum akka framesize.
    	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:308)
    	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:217)
    	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:138)
    	... 11 more
    And what looks like a root cause is that sometimes latest checkpoint is way over 100Mb and sometimes even 1Gb. I’d appreciate any ideas what to look for.
    s
    • 2
    • 2
  • r

    Raghunadh Nittala

    08/29/2022, 12:09 AM
    Hey Everyone, I'm using a KafkaSink to sink the results to kafka like
    sinkTo(kafkaSink)
    . I'm trying to come up with an end to end integration test and want to use a simple sink for the same. I came across
    CollectSink
    where I can add results to a list and do the matchers. But, CollectSink being
    SinkFunction
    , I am not able to use it in
    sinkTo
    , instead
    addSink
    is where it can be used. Is there a simple SinkInterafce implementation I can use for tests that can be used within
    sinkTo
    call? Thank you.
    s
    • 2
    • 8
  • a

    Aeden Jameson

    08/29/2022, 3:02 AM
    After upgrading from 1.13.5 to 1.15.2 while maintaining the use of the deprecated classes FlinkConsumer and FlinkProducer I observed that task manager metrics collected by our Prometheus reporter stopped working. From reading other threads in this channel on this same issue it appears this is now a known bug. My questions are, • Is there a workaround other than migrating to KafkaSource and KafkaSink? • When is this anticipated to be addressed?
    m
    c
    • 3
    • 2
  • h

    haim ari

    08/29/2022, 5:56 AM
    Hello, I’m running a k8s session cluster. The flink app that is running exposes some metrics which I’m able to see in the Flink UI (Screenshot) However it’s not clear to me how to expose them for our prometheus to scrape. Where should this be configured ?
    g
    m
    h
    • 4
    • 28
  • n

    Nipuna Shantha

    08/29/2022, 10:06 AM
    Hi Everyone. I have a small doubt to clear. Will there be any method to send multiple lib locations for same flink programme?
  • j

    Jirawech Siwawut

    08/29/2022, 11:02 AM
    Hello. Does Flink Hive support reading this
    scan
    option? https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/hive/hive_read_write/#streaming-source-partition-include I got this error
    Copy code
    The only supported 'streaming-source.partition.include' is 'all' in hive table scan, but is 'latest'
    I check the code but it seems the only option is
    all
    at the moment https://github.com/apache/flink/blob/f494be6956e850d4d1c9fd50b79e5a8dd5b53e47/flin[…]in/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java (edited)
  • p

    Pedro Mázala

    08/29/2022, 11:27 AM
    Hello folks. Does anyone here uses flink + ES sink? I’m trying to use Elasticsearch 8 but the main connector is not compatible with it.
    m
    • 2
    • 5
  • j

    Juan Carlos Gomez Pomar

    08/29/2022, 12:20 PM
    Hi everyone, I have an architecture question where insight is appreciated. Performance and operability: Many small SQL-jobs Vs. One DataStream job? The SQL-jobs would have the following characteristics: • In the tens of thousands of jobs. • Each with different load: some mostly idle and others with high throughput requirements • Stateless, no joins. We know we can deploy a Flink Cluster in Session Mode and deploy SQL statements there. In order to deploy many jobs in the same cluster, I understand that we need a high number of TaskSlots. However, resource-wise, we do not need that many number of machines. We would need around tens of machines, implying around 1000 TaskSlots per machine. 1. Can we deploy a job to cluster that does not have available task slots? We have obtained an error trying to deploy new jobs when no task slots are available. 2. If one job requires more CPU/Memory, are resources re-allocated automatically? From the docs it appears that it is not. 3. How feasible is to maintain and operate a high number of small jobs in the same cluster? On the other hand, the other option we have is to reduce the solution scope and implement a custom DataStream job that solves only a subset of what SQL can.
    m
    • 2
    • 11
  • b

    Bastien DINE

    08/29/2022, 3:27 PM
    Hello everyone, when will the docker image 1.15.2 will be available on offi docker hub /flink ?
    c
    • 2
    • 1
  • a

    Adrian Chang

    08/29/2022, 4:20 PM
    Hello, I am converting from Table API to DataStream API using the function in Python
    to_append_stream
    But it seems Flink does this operation in batches of rows and I am getting some delay, about 2 seconds. I haven't found how to configure that behaviour. Does anyone could guide me to the right parameters of the configuration please ?
    d
    • 2
    • 2
  • p

    Prasanth Kothuri

    08/29/2022, 7:54 PM
    Are there any plans to unify ProcessWindowFunction api between scala and java, currently java needs
    org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction
    and scala needs
    org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
    , this brings few problems, e.g our code is in scala and for writing unit tests I was using flink test harness (
    KeyedOneInputStreamOperatorTestHarness
    ) which is in java, and for testing ProcessWindowFunction I had to write it in java to get the test harness work otherwise it was giving type mismatch errors
    d
    • 2
    • 5
  • y

    Yahor Paulikau

    08/29/2022, 9:22 PM
    I’m having problem using reading Kafka topic with dynamic Avro schema. The idea is to provide schema through some API (Cloudera schema registry) and utilize schema dynamically. I understand that best was of doing this is using
    AvroDeserializationSchema.forGeneric(schema)
    where
    schema
    is a string representing avro parsed schema. This works fine until I try to convert stream into Flink table using
    Copy code
    val inputTable = tableEnv.fromDataStream(
          kafka_stream,
          Schema.newBuilder()
            .columnByExpression("proctime", "PROCTIME()")
            .build())
        inputTable.printSchema()
        tableEnv.createTemporaryView(table_name, kafka_stream)
    it’s printing this schema
    Copy code
    (
      `f0` RAW('org.apache.avro.generic.GenericRecord', '...'),
      `proctime` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* AS PROCTIME()
    )
    so this tells my i need further expand RAW data but have no idea how to do it. Also, looks like i need to convert from avro datatypes. If i print kafka message directly from the stream object I can see its being decoded with the correct schema so deserialization step into GenericRecord seems to work.
    s
    • 2
    • 18
  • h

    Haim Ari

    08/30/2022, 3:53 AM
    Hello, I’m using ArgoCD to deploy flink session cluster + flink apps. I noticed that there is no
    readiness/liveness
    checks of the apps. So if the flink app status is for example:
    Failed
    k8s will not kill the pods and in Argo cd the app is considered synced. when in fact it should be considered as failed. Is there a native(generic) way to handle that so that each app will self check ?
    s
    g
    m
    • 4
    • 46
  • s

    Sylvia Lin

    08/30/2022, 4:24 AM
    Hey forks, I noticed the flink job id when using flink operator is always
    job 00000000000000000000000000000000
    , is there way to make unique id everytime when start a new job?
    l
    g
    • 3
    • 5
  • r

    Rashmin Patel

    08/30/2022, 10:28 AM
    Hello I want to read data from S3 which is stored in parquet format, using DataSet API Here is my code, but unable to figure out what should I pass in for typeTag argument while initializing ParquetInputFormat
    Copy code
    val bEnv: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    
    bEnv
      .createInput(
        HadoopInputs
          .readHadoopFile(new ParquetInputFormat[T](), classOf[Void], classOf[T], "s3://<bucket-name>/<some-path>"))
  • p

    Piero Gerardo Torres Robatty

    08/30/2022, 1:23 PM
    Hello! I'm trying to understand how to read from a kafka topic with Avro Schema using Confluent Schema Registry. I build this source:
    Copy code
    KafkaSource<GenericRecord> source = KafkaSource.<GenericRecord>builder()
                .setBootstrapServers("localhost:9092")
                .setTopics("test1")
                .setGroupId("test-group1")
                .setStartingOffsets(OffsetsInitializer.earliest())        .setValueOnlyDeserializer(ConfluentRegistryAvroDeserializationSchema.forGeneric(schema, "<http://localhost:8081>"))
                .build();
    I still don't get what value should I put as Schema when I want to extract the schema from schema registry and use that info to deserialize the topic.
    a
    • 2
    • 2
  • s

    Satya

    08/30/2022, 2:41 PM
    Hello People, I am using
    flink-1.14.5
    and based on my use case I want to read files from S3 and for that I am adding
    flink-connector-filesystem
    but it seems like there is not connector for
    flink-1.14.*
    but there is docs for the same for
    flink-1.14.*
    . Is my understanding is correct?
    • 1
    • 1
  • k

    Krish Narukulla

    08/30/2022, 4:08 PM
    is there any option for local in memory database for sink? i have tried this
    Copy code
    String targetTable = """
            CREATE TEMPORARY TABLE orders_agg(
             order_id STRING,
             total BIGINT
            )
            WITH (
            'connector' = 'jdbc',
            'url' = 'jdbc:derby:memory:myDB;create=false',
            'table-name' = 'orders_agg'
            )
            """;
    m
    d
    • 3
    • 3
  • a

    Aeden Jameson

    08/30/2022, 5:11 PM
    I have a Flink job running on event time with a watermark of say X with a job that's roughly, Env .addSource(SomeKafkaSource) .keyBy(MyId) .process(TimeoutReporter) .keyBy(SameMyId) .window(Tumble(1 min)) .process(DoSomeWork) Event Timestamp = the timestamp method on the events/objects My question is around the behavior of the timeout reporter above. It's responsibility is to send a timeout event X minutes after the last seen event. What is the proper way to set the timer so that it is assigned to the correct tumble window downstream? My initial thought is something like
    Copy code
    public class TimeoutReporter extends KeyedProcessFunction<Key, IN, OUT> {
        private transient ValueState<Long> sessionState;
    
    
        public void processElement(final IN event, final Context ctx, final Collector<OUT> out) throws Exception {
           final Long timeout = sessionState.value();
            if (timeout != null) {
                ctx.timerService().deleteEventTimeTimer(timeout);
            }
    
    
            final long newTimer = (event.timstamp() - [WatermarkInMillis]) + sessionTimeoutInMillis;
            ctx.timerService().registerEventTimeTimer(newTimer);
            sessionState.update(newTimer);
            out.collect(event);
        }
    
    
    
        @Override
        public void onTimer(final long timestamp,
                            final OnTimerContext ctx,
                            final Collector<OUT> out) throws Exception {
            super.onTimer(timestamp, ctx, out);
    
            final Long result = sessionState.value();
            if (result != null) {
                final KEY key = ctx.getCurrentKey();
    
                // I believe I need to add back the watermark because timestamp is in Flink Event time
    
                out.collect(TimeoutEvent.build(timestamp + [WatermarkInMillis], key));
            } else {
                /// .... log something
            }
        }
    
    }
    With this implementation I’d expect to see the same results regardless of how I set the watermark, but that’s it the case. Thanks in advance.
    d
    • 2
    • 5
  • k

    Krish Narukulla

    08/30/2022, 5:14 PM
    I am on flink 1.16. i have run into few errors with
    datagen source and jdbc sink
    . Is it flink 1.16 is not free from scala?
    Copy code
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
            at scala.collection.TraversableLike.map(TraversableLike.scala:286)
            at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
            at scala.collection.AbstractTraversable.map(Traversable.scala:108)
            at org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:92)
            at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:197)
            at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1723)
            at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:811)
            at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:904)
            at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
            at com.roku.common.frameworks.airstream.examples.sql.Datagen.main(Datagen.java:46)
    m
    t
    • 3
    • 25
  • k

    Kyle Ahn

    08/30/2022, 5:30 PM
    What would be the best way to handle an optional field in POJO?
  • a

    Adrian Chang

    08/30/2022, 8:39 PM
    Hello, my team is evaluating if we use Java instead of Python. We noted there is no documentation for some DataStream API connectors in Python, for example Kafka. 1. Is it because is not yet supported in DataStream API for Python ? 2. Is there any other limitations if we decide to continue using Python ? 3. Can I call UDF Python functions from a job written in Java ? I would like to hear your recommendations please
  • i

    Ikvir Singh

    08/30/2022, 8:54 PM
    Is there any way in Flink to de/serialize a Java collection such as Set as POJO? I only see built-in types for Map, List
  • s

    Stephan Weinwurm

    08/31/2022, 12:03 AM
    Hi all, we’re in the process of upgrading a Flink Statefun application from 3.0.0 (Flink version 1.12) to 3.2.0 (Flink Version 1.14) and we’re seeing a significant increase in memory consumption to the point where we constantly run into OutOfMemory errors and have to roll back. The application is fairly high traffic with around 15k - 20k messages per second. Is this known?
  • j

    Jirawech Siwawut

    08/31/2022, 1:23 AM
    Hi. I wonder how could we assign watermark to complex data format like this
    Copy code
    {
      "name": "david",
      "events": [
        {
          "timestamp": "2022-08-01 00:00:00",
          "id": "1"
        },
        {
          "timestamp": "2022-08-01 00:00:01",
          "id": "2"
        }
      ]
    }
    I would like to create watermark on column timestamp
  • d

    Donatien Schmitz

    08/31/2022, 8:02 AM
    Hi all, is there a way to enable Direct IO for RocksDB without altering the code? I'm running benchmarks and page caches should be disabled.
  • s

    Sylvia Lin

    09/01/2022, 12:43 AM
    Hey forks, I'm running my job on AWS EKS, and I noticed the AWS IRSA doesn't work. I followed this ticket and upgrade my job to Flink 1.14. The presto one works fine, but hadoop one still doesn't work. We checked the s3 log, basically the s3 request was made from the Node role instead of the sa associated role (and we verified the correct sa associated role was attached to the pods) I also refer to this thread: https://apache-flink.slack.com/archives/C03G7LJTS2G/p1656680689974479 But actually I run dependency analysis, and I don't see any aws related dependencies, below are all my depencies.
    Copy code
    "org.apache.flink" %% "flink-connector-kafka" % flinkVersion,
      "org.apache.flink" % "flink-metrics-datadog" % flinkMetricVersion,
      "org.apache.flink" % "flink-metrics-dropwizard" % flinkMetricVersion
      "org.json4s" %% "json4s-ast" % "4.0.1",
      "org.json4s" %% "json4s-jackson" % "4.0.1",
      "com.fasterxml.jackson.core" % "jackson-core" % "2.12.4"
    Any suggestion here? we'll have to use hadoop, since
    FileSink
    only supports hadoop.
  • k

    Krish Narukulla

    09/01/2022, 6:23 AM
    Is there a test sink I can leverage for integration tests using kafka source? Basically i want to produce message to kafka and validate that sink receives message after transformation.
    r
    d
    • 3
    • 2
  • j

    Jirawech Siwawut

    09/01/2022, 8:46 AM
    Hello. Does Flink Hive support reading this
    latest
    option for Hive read? https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/hive/hive_read_write/#streaming-source-partition-include I got this error
    Copy code
    The only supported 'streaming-source.partition.include' is 'all' in hive table scan, but is 'latest'
    I check the code but it seems the only option is
    all
    at the moment https://github.com/apache/flink/blob/f494be6956e850d4d1c9fd50b79e5a8dd5b53e47/flin[…]in/java/org/apache/flink/connectors/hive/HiveSourceBuilder.java
  • r

    Robin Cassan

    09/01/2022, 9:05 AM
    Hello all! Is anyone aware of why the 1.15.2 release doesn't include this commit which upgrades kafka to 3.1.1 and fixes an important issue? Also, do you know if a new release of Flink is already in preparation to include this change? Thanks!
    ✅ 1
    c
    • 2
    • 4
1...151617...98Latest