https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • r

    raghav tandon

    11/24/2022, 10:22 AM
    Copy code
    In order to recover submitted jobs, Flink persists metadata and the job artifacts. The HA data will be kept until the respective job either succeeds, is cancelled or fails terminally. Once this happens, all the HA data, including the metadata stored in the HA services, will be deleted.
    Does this mean if the job is restarted(due to user code exception) and eventually stopped due to
    restart-strategy: fixed-delay
    , then the HA state from ZK is gone, is there a way to change this behaviour?
    g
    • 2
    • 4
  • a

    Alexis Josephides (Contractor)

    11/24/2022, 2:47 PM
    👋 does anyone have an optimisation tips and tricks for
    Streaming File Sink
    to S3? tldr: The operator is consistently showing the most backpressure and resource usage and causing the Flink app to full restart.
    m
    h
    s
    • 4
    • 33
  • t

    Thiruvenkadesh Someswaran

    11/24/2022, 11:03 PM
    👋 I am unable to bring up AWS EKS flink operator 1.15 HA example it worked fine in the default namespace in minikube but not on eks (flink namespace) Logs from the pod
    ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal error occurred in the cluster entrypoint.
    java.util.concurrent.CompletionException: java.lang.IllegalStateException: The base directory of the JobResultStore isn't accessible. No dirty JobResults can be restored.
    Here is my flink config
    image: flink:1.15
    flinkVersion: v1_15
    flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.savepoints.dir: file:///flink-data/savepoints
    state.checkpoints.dir: file:///flink-data/checkpoints
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: file:///flink-data/ha
    rest.port: "9091"
    g
    • 2
    • 3
  • s

    Slackbot

    11/25/2022, 3:42 AM
    This message was deleted.
    l
    b
    • 3
    • 2
  • l

    Lee xu

    11/25/2022, 3:45 AM
    Hello, is the cause of this problem that the file was deleted by another program during the run? 2022-11-24 151224,072 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job Job_product_1cc48045ef8d439096a113e4b5c0372c_2 (26777f11b9ac55c1cebb0c6c3dd201d6) switched from state RUNNING to FAILING. org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandlingResult(ExecutionFailureHandler.java:101) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at org.apache.flink.runtime.scheduler.DefaultScheduler.handleGlobalFailure(DefaultScheduler.java:273) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyGlobalFailure(UpdateSchedulerNgOnInternalFailuresListener.java:57) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.failGlobal(DefaultExecutionGraph.java:971) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.vertexFinished(DefaultExecutionGraph.java:1091) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at org.apache.flink.runtime.executiongraph.ExecutionVertex.executionFinished(ExecutionVertex.java:525) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:948) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.updateStateInternal(DefaultExecutionGraph.java:1236) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.updateState(DefaultExecutionGraph.java:1207) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:678) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_181] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_181] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_181] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_181] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316) ~[flink-rpc-akka_96b66466-ecb9-4230-b01f-89e669d9a8a4.jar:1.14.3] at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) ~[flink-rpc-akka_96b66466-ecb9-4230-b01f-89e669d9a8a4.jar:1.14.3] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314) ~[flink-rpc-akka_96b66466-ecb9-4230-b01f-89e669d9a8a4.jar:1.14.3] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217) ~[flink-rpc-akka_96b66466-ecb9-4230-b01f-89e669d9a8a4.jar:1.14.3] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) ~[flink-rpc-akka_96b66466-ecb9-4230-b01f-89e669d9a8a4.jar:1.14.3] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) ~[flink-rpc-akka_96b66466-ecb9-4230-b01f-89e669d9a8a4.jar:1.14.3] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [flink-rpc-akka_96b66466-ecb9-4230-b01f-89e669d9a8a4.jar:1.14.3] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [flink-rpc-akka_96b66466-ecb9-4230-b01f-89e669d9a8a4.jar:1.14.3] at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-rpc-akka_96b66466-ecb9-4230-b01f-89e669d9a8a4.jar:1.14.3] at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-rpc-akka_96b66466-ecb9-4230-b01f-89e669d9a8a4.jar:1.14.3] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [flink-rpc-akka_96b66466-ecb9-4230-b01f-89e669d9a8a4.jar:1.14.3] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-rpc-akka_96b66466-ecb9-4230-b01f-89e669d9a8a4.jar:1.14.3] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_96b66466-ecb9-4230-b01f-89e669d9a8a4.jar:1.14.3] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_96b66466-ecb9-4230-b01f-89e669d9a8a4.jar:1.14.3] at akka.actor.Actor.aroundReceive(Actor.scala:537) [flink-rpc-akka_96b66466-ecb9-4230-b01f-89e669d9a8a4.jar:1.14.3] at akka.actor.Actor.aroundReceive$(Actor.scala:535) [flink-rpc-akka_96b66466-ecb9-4230-b01f-89e669d9a8a4.jar:1.14.3] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [flink-rpc-akka_96b66466-ecb9-4230-b01f-89e669d9a8a4.jar:1.14.3] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) [flink-rpc-akka_96b66466-ecb9-4230-b01f-89e669d9a8a4.jar:1.14.3] at akka.actor.ActorCell.invoke(ActorCell.scala:548) [flink-rpc-akka_96b66466-ecb9-4230-b01f-89e669d9a8a4.jar:1.14.3] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [flink-rpc-akka_96b66466-ecb9-4230-b01f-89e669d9a8a4.jar:1.14.3] at akka.dispatch.Mailbox.run(Mailbox.scala:231) [flink-rpc-akka_96b66466-ecb9-4230-b01f-89e669d9a8a4.jar:1.14.3] at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [flink-rpc-akka_96b66466-ecb9-4230-b01f-89e669d9a8a4.jar:1.14.3] at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_181] at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_181] at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_181] at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) [?:1.8.0_181] Caused by: java.lang.Exception: Failed to finalize execution on master ... 38 more Caused by: org.apache.flink.table.api.TableException: Exception in finalizeGlobal at org.apache.flink.table.filesystem.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:91) ~[flink-table_2.11-1.14.3.jar:1.14.3] at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.finalizeOnMaster(InputOutputFormatVertex.java:148) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.vertexFinished(DefaultExecutionGraph.java:1086) ~[flink-dist_2.11-1.14.3.jar:1.14.3] ... 37 more Caused by: java.io.FileNotFoundException: File hdfs://nameservice1/user/hive/warehouse/dsp_ods.db/ods_ote_wms_shipping_container_header/.staging_1669270271637/task-0/created_pc=2021-03-29 does not exist. at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:986) ~[hadoop-hdfs-client-3.0.0-cdh6.3.2.jar:?] at org.apache.hadoop.hdfs.DistributedFileSystem.access$1000(DistributedFileSystem.java:122) ~[hadoop-hdfs-client-3.0.0-cdh6.3.2.jar:?] at org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1046) ~[hadoop-hdfs-client-3.0.0-cdh6.3.2.jar:?] at org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1043) ~[hadoop-hdfs-client-3.0.0-cdh6.3.2.jar:?] at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) ~[hadoop-common-3.0.0-cdh6.3.2.jar:?] at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:1053) ~[hadoop-hdfs-client-3.0.0-cdh6.3.2.jar:?] at org.apache.flink.hive.shaded.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:170) ~[flink-sql-connector-hive-2.2.0_2.11-1.14.3.jar:1.14.3] at org.apache.flink.table.utils.PartitionPathUtils.listStatusWithoutHidden(PartitionPathUtils.java:193) ~[flink-table_2.11-1.14.3.jar:1.14.3] at org.apache.flink.table.filesystem.PartitionLoader.renameFiles(PartitionLoader.java:106) ~[flink-table_2.11-1.14.3.jar:1.14.3] at org.apache.flink.table.filesystem.PartitionLoader.overwriteAndRenameFiles(PartitionLoader.java:86) ~[flink-table_2.11-1.14.3.jar:1.14.3] at org.apache.flink.table.filesystem.PartitionLoader.loadPartition(PartitionLoader.java:72) ~[flink-table_2.11-1.14.3.jar:1.14.3] at org.apache.flink.table.filesystem.FileSystemCommitter.commitPartitions(FileSystemCommitter.java:83) ~[flink-table_2.11-1.14.3.jar:1.14.3] at org.apache.flink.table.filesystem.FileSystemOutputFormat.finalizeGlobal(FileSystemOutputFormat.java:89) ~[flink-table_2.11-1.14.3.jar:1.14.3] at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.finalizeOnMaster(InputOutputFormatVertex.java:148) ~[flink-dist_2.11-1.14.3.jar:1.14.3] at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.vertexFinished(DefaultExecutionGraph.java:1086) ~[flink-dist_2.11-1.14.3.jar:1.14.3] ... 37 more
    h
    • 2
    • 2
  • r

    Raghunadh Nittala

    11/25/2022, 4:54 AM
    Hello Team, We are working on saving a Datastream records of protobuf format to S3 in parquet files. We’re using FileSink implementation for the same. Few dependencies like
    org.apache.parquet:parquet-protobuf
    were required for this and we’ve added the same to our gradle (I’ve added them to flinkShadowJar). When I try to run the job using the fat jar, it throws
    Caused by: java.lang.NoClassDefFoundError: org/apache/parquet/proto/ProtoWriteSupport
    at org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters.lambda$forType$84425dfe$1(ParquetProtoWriters.java:41)
    at org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:56)
    at org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNew(BulkBucketWriter.java:76)
    ....
    ....
    Caused by: java.lang.ClassNotFoundException: org.apache.parquet.proto.ProtoWriteSupport
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
    at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
    The job only succeeds when we copy the jar
    org.apache.parquet:parquet-protobuf
    into Flink lib. Why the class is not being picked when it is available in the fat jar (I extracted the fat JAR and could see the class inside). I tried by providing compileOnly/runtimeOnly as well, but no luck.
    m
    • 2
    • 8
  • a

    Abhinav sharma

    11/25/2022, 11:06 AM
    Hi, I am using a KafkaSource to fetch data from Kafka and process in Flink. Once the data is received, I want to fetch all the events received within a 15minute window. I am aware of the windowing part but unable to implement the watermark on the timestamp of each event. KafkaSource kafkaAllEvents =... DataStream source=env.fromSource(kafkaAllEvents, Watermark strategy.noWatermarks(), "data"); DataStream=source.map(json of data).assignTimestampandWatermarks(new Watermark strategy) Above watermark strategy implements createWatermarkGenerator and createTimestampAssigner which generates a watermark at a field "inputDate" of the incoming events... However this doesn't work
    m
    • 2
    • 4
  • r

    Raghunadh Nittala

    11/25/2022, 2:11 PM
    Hi Team, A little extension to the question I’ve asked above. We are working on saving a DataStream of protobuf format to S3 and Azure. We are able to Sink the data to S3, however the Sink to Azure (FileSink) is failing with the exception:
    Caused by: java.lang.ClassNotFoundException: org.apache.flink.fs.azure.common.hadoop.HadoopRecoverableWriter
    at <http://java.net|java.net>.URLClassLoader.findClass(Unknown Source) ~[?:?]
    at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
    at org.apache.flink.core.classloading.ComponentClassLoader.loadClassFromComponentOnly(ComponentClassLoader.java:149) ~[flink-dist-1.16.0.jar:1.16.0]
    at org.apache.flink.core.classloading.ComponentClassLoader.loadClassFromOwnerFirst(ComponentClassLoader.java:171) ~[flink-dist-1.16.0.jar:1.16.0]
    at org.apache.flink.core.classloading.ComponentClassLoader.loadClass(ComponentClassLoader.java:106) ~[flink-dist-1.16.0.jar:1.16.0]
    at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
    I have copied flink-azure-fs-hadoop.jar to azure-fs-hadoop plugins folder as mentioned in https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/filesystems/azure/. While exploring around this, I came across the JIRAs https://issues.apache.org/jira/browse/FLINK-17444 and https://issues.apache.org/jira/browse/FLINK-18568. Now my question is, Dies FileSink work for Azure? Why I’m seeing this exception, even if I’ve provided the Jar under plugins?
    m
    • 2
    • 3
  • s

    sharad mishra

    11/25/2022, 2:36 PM
    Hi Team, I’m using Flink 1.16.0(with scala 2.12) and trying to write unit test cases for one of ProcessFunction in my application as per doc However I can’t seem to resolve
    ProcessFunctionTestHarnesses
    even after adding following dependencies:
    Copy code
    "org.apache.flink" % "flink-test-utils" % flinkVersion % Test,
      "org.apache.flink" % "flink-test-utils-junit" % flinkVersion % Test,
      "org.apache.flink" % "flink-runtime" % flinkVersion % Test
    Am I missing something ?
    c
    r
    +2
    • 5
    • 18
  • m

    Matt Butler

    11/25/2022, 10:49 PM
    Hello everyone - I’m trying to measure throughput of a job (with a UDF call vs without) and float that metric to the Flink dashboard. I’m not sure what I’m overlooking, I’m using the documents and Flink code itself to try to piece together the necessary steps. When I run code similar to below, I’m not seeing it show up in the Metrics dropdown list. Can someone shed some light on this? Thank you!
    Copy code
    public class TestClass {
            public static class MyTestFunction extends ScalarFunction {
                private static transient Meter meter;
                
                @Override
                public void open(FunctionContext context) throws Exception {
                    com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter();
                    this.meter = context.getMetricGroup()
                            .meter("myMeter", new DropwizardMeterWrapper(dropwizardMeter));
                }
    
                public String eval(String s) {
                    this.meter.markEvent();
                    
                    return "Test";
                }
            }
    
            public static void main(String[] args) throws Exception {
    
                EnvironmentSettings settings = EnvironmentSettings
                        .newInstance()
                        .inStreamingMode()
                        .build();
    
                TableEnvironment tableEnvironment = TableEnvironment.create(settings);
    
                tableEnvironment.createTemporarySystemFunction("MyTestFunction", MyTestFunction.class);
                ....
            }
    }
    m
    • 2
    • 1
  • v

    Vishal bharatbhai Vanpariya

    11/26/2022, 10:42 AM
    Hi Team, i am pushing processed data to kafka. but i want to push data with the key value pair i tried to use keyedby stream and pushed to kafka but that is not working. if i push data from terminal below is the example. but how can i do same through flink?
    Copy code
    kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name --property "parse.key=true" --property "key.separator=:"
    
    key1:value1
    key2:value2
    key3:value3
    Flink code:
    Copy code
    keyedstream.
                    sinkTo(KafkaSink.<String>builder()
                            .setBootstrapServers("localhost:9092")
                            .setRecordSerializer(
                                    KafkaRecordSerializationSchema.builder().setTopic("flink")
                                            .setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build());
    Thanks
    s
    • 2
    • 2
  • v

    Vishal bharatbhai Vanpariya

    11/28/2022, 7:27 AM
    Can somebody help me how can i push parquet file to s3 bucket? https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/ i've tried this but this is working for txt file only
    d
    • 2
    • 8
  • j

    Jackwangcs

    11/28/2022, 8:45 AM
    Hi Team, we are upgrading Flink 1.13.2 to Flink 1.16, but a performance degradation is observed(about 15% for jobs with 10 joins, processing records from Upsert Kafka to Upsert Kafka). Can someone help give some lights about this?
    s
    • 2
    • 3
  • d

    ding bei

    11/28/2022, 2:09 PM
    hi Team, I am using DataStream Connectors FlieSystem to write parquet file into s3. I want to split file by its size. With BulkFormat, it only support
    OnCheckpointRollingPolicy
    which contains a description below picture. I tried to extends
    CheckpointRollingPolicy
    to override function shouldRollOnEvent , did not work. And i saw a question on stackoverflow which is exactly same to question i have , but no answers. So how can i rolling files by its size using DataStream API?
    Copy code
    <https://stackoverflow.com/questions/70288006/flink-filesink-with-bulk-format-to-s3-rolling-policy-how-to-specify-size-time>
    d
    • 2
    • 4
  • d

    Donatien Schmitz

    11/28/2022, 2:32 PM
    Hi devs, what is the easiest way to fetch TaskManager metrics programatically from within the
    JobMaster
    ? I see that there is a
    TaskManagerTracker
    used by
    SlotManagers
    but I don't see any access available through the
    JobMaster
    . Right now I am using the
    ResourceManagerGateway
    but the Gateway is generated after the creation of the Scheduler so I'm facing a null pointer exception when I try to call the
    requestTaskManagerInfo
    method.
  • d

    Dylan Fontana

    11/28/2022, 3:45 PM
    👋 Hi folks, question on Kafka Partitions and Source Readers. The docs say a source won't go idle if "parallelism is higher than the number of partitions." I'm assuming the relationship between parallelism & partitions is because each split is a single partition, based on the examples section here. In my application I'm not noticing an issue with parallelism (6) being set higher than the number of partitions (5), however. I am indeed seeing that one of the task instances isn't doing anything (bytes sent/received from that one instance is 0). I am not noticing watermarks being held back due to a non-idle source, though. Should I be?
    m
    • 2
    • 3
  • f

    Felix Angell

    11/28/2022, 4:37 PM
    does anyone know what version of the JDK is recommended for PyFlink on KDA 1.15.2?
    ✅ 1
    d
    d
    • 3
    • 7
  • a

    Aeden Jameson

    11/28/2022, 6:03 PM
    Is there a way to use the sql-client or some part of it in a build pipeline to produce the packaged jar it would otherwise submit to a flink cluster?
  • k

    Karthi Thyagarajan

    11/29/2022, 1:12 AM
    Hello folks, I’m trying to get the FileSink working in PyFlink (1.15) with the Table API using Parquet format on my local machine. I’ve included the following jar dependencies: • hadoop-common-2.10.1.jar • hadoop-mapreduce-client-core-2.10.1.jar as described here: https://github.com/immerok/recipes/blob/main/write-parquet-files/pom.xml And I’m getting this class not found error:
    java.lang.NoClassDefFoundError: com/ctc/wstx/io/InputBootstrapper
    at org.apache.flink.formats.parquet.ParquetFileFormatFactory.getParquetConfiguration(ParquetFileFormatFactory.java:92)
    at org.apache.flink.formats.parquet.ParquetFileFormatFactory.access$000(ParquetFileFormatFactory.java:52)
    at org.apache.flink.formats.parquet.ParquetFileFormatFactory$1.createRuntimeEncoder(ParquetFileFormatFactory.java:80)
    at org.apache.flink.formats.parquet.ParquetFileFormatFactory$1.createRuntimeEncoder(ParquetFileFormatFactory.java:74)
    at org.apache.flink.connector.file.table.FileSystemTableSink.createWriter(FileSystemTableSink.java:376)
    at org.apache.flink.connector.file.table.FileSystemTableSink.createStreamingSink(FileSystemTableSink.java:197)
    at org.apache.flink.connector.file.table.FileSystemTableSink.consume(FileSystemTableSink.java:155)
    at org.apache.flink.connector.file.table.FileSystemTableSink.lambda$getSinkRuntimeProvider$0(FileSystemTableSink.java:140)
    Any suggestions on what I could be missing? Thanks in advance.
    m
    d
    • 3
    • 23
  • s

    Shira Bodenstein

    11/29/2022, 8:20 AM
    Hi everyone, I have a streaming application which works with event time, where the incoming events have a timestamp and can be out of order. I also allow lateness of X hours. The system performs windowing on the data. Up until now for the watermark I used
    Copy code
    assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner)
    The periodic assigner simply extracted the timestamp from the event. I have upgraded flink version to 1.11.2 from 1.6.3. This API is deprecated and I'm trying to understand what should I use to replace it. I tried:
    Copy code
    assignTimestampsAndWatermarks(WatermarkStrategy.<T>
                                    forBoundedOutOfOrderness(Duration.ofHours(valueLatenessEntryHours))
                            .withTimestampAssigner(SerializableTimestampAssigner<T> timestampAssigner))
    while the timestampAssigner extracted the timestamp from the event. But something seems not to work, I don't see any data fires from the windows and written to the sink. Any ideas on what could be missing? Thank you!
    👍 1
    m
    • 2
    • 3
  • m

    monaawang

    11/29/2022, 8:59 AM
    Hello everyone - I used toRowType in the Table API in flink1.13.6, and an error occurred when converting table to datastream for type mapping exception. Does anyone know what happened? Is the deprecated api causing the bug, is there an alternative api,
    m
    • 2
    • 12
  • k

    Kosta Sovaridis

    11/29/2022, 1:21 PM
    Hi everyone, is there an example of deployment for Statefun using the Flink Operator? I have noticed that kubernetes-taskmanager is defined in the original flink-console.sh but it is not redefined in the flink-statefun-docker flink-console.sh. Is it intended that all Kubernetes related tasks are not there?
    a
    n
    • 3
    • 8
  • s

    Sumit Nekar

    11/29/2022, 2:02 PM
    Hello Team, I would need some inputs to debug this issue. I am trying to upgrade an existing FlinkDeployment using flink-operator. Added a new operator to the existing job and trying to deploy the new jar. Getting the following issue.
    INFO ][flink-flink-test-application-preprod/flink-test-application] >>> Event  | Warning | CLUSTERDEPLOYMENTEXCEPTION | The Flink cluster flink-test-application already exists.
    2022-11-29 13:55:05,166 o.a.f.k.o.r.ReconciliationUtils [WARN ][flink-flink-test-application-preprod/flink-test-application] Attempt count: 10, last attempt: true
    2022-11-29 13:55:05,193 o.a.f.k.o.l.AuditUtils         [INFO ][flink-flink-test-application-preprod/flink-test-application] >>> Status | Error   | UPGRADING       | org.apache.flink.client.deployment.ClusterDeploymentException: The Flink cluster flink-test-application already exists.
    2022-11-29 13:55:05,193 i.j.o.p.e.ReconciliationDispatcher [ERROR][flink-flink-test-application-preprod/flink-test-application] Error during event processing ExecutionScope{ resource id: ResourceID{name='flink-test-application', namespace='flink-test-application-ns'}, version: 186818983} failed.
    org.apache.flink.kubernetes.operator.exception.ReconciliationException: org.apache.flink.client.deployment.ClusterDeploymentException: The Flink cluster flink-test-application already exists.
    at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:133)
    at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:54)
    at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:130)
    at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:88)
    at org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:81)
    at io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:87)
    at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:135)
    at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:115)
    at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:86)
    at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:59)
    at io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:395)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
    I have not assigned any kuberentes.cluster-id explicitly. Can this happen when updated job graph is not compatible with the previous one?
    Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: The Flink cluster flink-test-application already exists.
    at org.apache.flink.kubernetes.KubernetesClusterDescriptor.deployApplicationCluster(KubernetesClusterDescriptor.java:181)
    at org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer.run(ApplicationClusterDeployer.java:67)
    at org.apache.flink.kubernetes.operator.service.NativeFlinkService.deployApplicationCluster(NativeFlinkService.java:70)
    at org.apache.flink.kubernetes.operator.service.AbstractFlinkService.submitApplicationCluster(AbstractFlinkService.java:177)
    at org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:187)
    at org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:55)
    at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.restoreJob(AbstractJobReconciler.java:208)
    at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.reconcileSpecChange(AbstractJobReconciler.java:140)
    at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:153)
    at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:56)
    at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:123)
    ... 13 more
    2022-11-29 13:55:05,194 i.j.o.p.e.EventProcessor       [ERROR][flink-test-application-ns/flink-test-application] Exhausted retries for ExecutionScope{ resource id: ResourceID{name='flink-test-application', namespace='flink-flink-test-application-ns'}, version: 186818983}
    g
    a
    • 3
    • 15
  • a

    Ali AIT-BACHIR

    11/29/2022, 2:15 PM
    Hi everyone, I'm facing this issue Using Numba in Flink Python UDFs. Can you please help me to fix this or use any workaround. And here is my log trace : https://apache-flink.slack.com/files/T03GEQSEJAC/F04D87W2ZUZ?origin_team=T03GEQSEJAC I used
    stumpy
    that uses
    numba
    .
    numba___PyFlink_Log_trace
    d
    • 2
    • 4
  • c

    Chase Diem

    11/29/2022, 5:13 PM
    Hey all -- Looking for some advice regarding the flink-metrics-datadog plugin. The DatadogHttpClient hard-codes the URL to
    <https://app.datadoghq>
    , however that's an issue becuase we would like to use a custom endpoint set up by VictoriaMetrics as noted here https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent. Is there any easy way to override the URL?
    c
    a
    • 3
    • 12
  • m

    Momir Beljic

    11/29/2022, 7:25 PM
    Hello, I am using
    pyflink
    with
    datastream API
    to run some kinesis data analytics application that reads from kinesis data stream. I generate locally some random data using kinesis data stream
    process_message_stream
    and now I want to read that stream with pyflink datastream api. However, I get this message,
    Copy code
    py4j.protocol.Py4JJavaError: An error occurred while calling o1.toAppendStream.
    : org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.process_message_stream'.
    when I run this pyflink code:
    Copy code
    env = StreamExecutionEnvironment.get_execution_environment()
        t_env = StreamTableEnvironment.create(stream_execution_environment=env)
    
        CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
    
        t_env.get_config().get_configuration().set_string(
            "pipeline.jars",
            "file:///" + CURRENT_DIR + "/lib/flink-sql-connector-kinesis_2.12-1.13.2.jar" #"/lib/flink-sql-connector-kinesis-1.15.2.jar",
        ) 
    
        t_env.execute_sql(""" CREATE TABLE process_message_stream (
                    EVENT_DATE_TIME TIMESTAMP(3),
     
                    DEV_ID VARCHAR(6),
                    T_ID VARCHAR(6),
        
                ) PARTITIONED By (DEV_ID)
                WITH (
                    'connector' = 'kinesis',
                    'aws.region' = 'eu-west-1',
                    'stream' = 'process_message_stream'
                )""")
        ds = t_env.to_append_stream(
        t_env.from_path("process_message_stream"),
        Types.ROW([Types.SQL_TIMESTAMP(), Types.STRING(), Types.STRING()]))
        ds.add_sink(StreamingFileSink
                    .for_row_format('/tmp/output', Encoder.simple_string_encoder())
                    .build())
        t_env.execute("tutorial_job")
    m
    d
    • 3
    • 10
  • s

    Steven Zhang

    11/29/2022, 7:37 PM
    How does a change made in the Flink operator on a CRD object get synced to the CRD in the Kubernetes cluster itself? I was looking through what happens when a manual savepoint is triggered with the savepoint nonce and I see that in SavepointObserver, the new savepoint gets added to savepoint history list of the SavePointInfo, but how does making a change to the object in operator get reflected in the k8s resource?
    g
    • 2
    • 4
  • r

    RICHARD JOY

    11/29/2022, 7:55 PM
    Good day, everyone! I enabled podSecurityContext.fsGroup to 9999 for flink-operator pod in-order to change the PVC operatorVolumeMounts.data.MountPath /opt/flink/artifacts ownership to the ‘flink’ service account. But then the operator pod is not starting at all. Does anyone know how to get the ownership changed for the mounts in the operator pod? This is required because session job deployment is failing since operator writing metadata to this location during the request gives a permission denied since mounted as root. Thx for any help.
    j
    • 2
    • 11
  • e

    Emmanuel Leroy

    11/29/2022, 9:11 PM
    Hi, I have a journaling job that stores raw events (change logs) to object storage. I want to implement a HybridSource that reads the files up to the latest checkpoint of the journaling job, and switches to Kafka. So the question is: how can I access the offset of the journaling job from anotherjob? Is there any way to do that?
    m
    • 2
    • 4
  • c

    Connie Yang

    11/30/2022, 12:37 AM
    hi there, has anyone been able to submit a Flink job to an existing Flink cluster using Apache Flink's CRD job spec?
    g
    e
    r
    • 4
    • 9
1...353637...98Latest