https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • m

    Martin Bomio

    09/08/2022, 4:30 PM
    Hello! Are lookup joins supported when the left side of the join is an event time table? I have a LookupTableSource which is backed up by a service call (gRPC) and I would like to join it with a source that uses event time, but I get the following error:
    Copy code
    Event-Time Temporal Table Join requires both primary key and row time attribute in versioned table, but no row time attribute can be found
    When I change the table to use processing time, the join works without issues. In our case, the lookup is not dependent on time
    d
    • 2
    • 4
  • c

    chunilal kukreja

    09/09/2022, 1:08 PM
    Hello Experts, I have a use case where I want to pass the “Iterable<PojoObject> elements” to an embedded function using async I/O. Going through the documentation & other resources my understanding is that async i/o is done for each record you have in a datastream. But what if I want to pass a list of objects already processed by flink say by keyBy() operator. This is my flink code;
    Copy code
    DataStream<EventDataMapping> dataMappingDataStream = inputDataStream
                    .process(new DeriveKeyProcessFunction())
                    .keyBy(new KeySelector<EventDataMapping, Tuple2<String, String>>() {
    
                        @Override
                        public Tuple2<String, String> getKey(EventDataMapping eventDataMapping) throws Exception {
                            return Tuple2.of(eventDataMapping.getTenancyId(), eventDataMapping.getEventExecutionType());
                        }
                    })
                    .countWindow(100)
                    .process(new EventProcessWindow()) -> i want to call async i/o in the windowed process function. how to do that? 
                    .returns(EventDataMapping.class);
    Is this achievable? Thanks in advance !
    c
    • 2
    • 8
  • j

    Jirawech Siwawut

    09/09/2022, 3:17 PM
    Hi. I would like to ask quick question. I have one Kafka source, and i would like to create two sinks, Hive and another Kafka. Is it possible to use Flink SQL with multiple sinks? It seems that my job will create two Flink jobs for multiple sinks. What i did is something like this
    Copy code
    tableEnv.createTemporaryView("tempTable", tempTable)
    tableEnv.executeSql("INSERT INTO Kafkatable SELECT * FROM tempTable)
    tableEnv.executeSql("INSERT INTO hiveTable SELECT * FROM tempTable)
    s
    • 2
    • 2
  • s

    Satya

    09/09/2022, 4:57 PM
    Hi All, I am trying to implement a stream pipeline using FileSystem DataStream connector to read continuous streaming avro file from S3 and then write the data into a database. I get the point that I need to create a
    StreamFormat
    or
    SimpleStreamFormat
    by extending it like below:
    Copy code
    object AvroStreamFormat extends SimpleStreamFormat[GenericRecord] {
      override def createReader(config: Configuration, stream: FSDataInputStream): StreamFormat.Reader[GenericRecord] = ???
    
      override def getProducedType: TypeInformation[GenericRecord] = TypeInformation.of(classOf[GenericRecord])
    }
    I am struggling to create a
    createReader
    for avro file. Any pointer would be great to move forward
  • c

    Chris F

    09/09/2022, 10:02 PM
    Trying to use kafka ingress with flink stateful functions and getting
    Copy code
    java.lang.IllegalStateException: unable to find a router for ingress IngressIdentifier(showcase, names, interface com.google.protobuf.Message)
    	at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
    	at org.apache.flink.statefun.flink.core.translation.IngressRouterOperator.loadRoutersAttachedToIngress(IngressRouterOperator.java:96)
    	at org.apache.flink.statefun.flink.core.translation.IngressRouterOperator.open(IngressRouterOperator.java:78)
    	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
    	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
    	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
    	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
    	at java.base/java.lang.Thread.run(Unknown Source)
    • 1
    • 1
  • g

    Gaurav Miglani

    09/09/2022, 11:44 PM
    getting frequent errors in flink kubernetes operator when complete kube cluster is on spot, even kubernetes flink operator pod is getting restarted again and again, should i change my complete cluster to 100% on demand 🤔, getting error: 2022-09-09 233636,978 o.a.f.s.n.i.n.u.c.D.rejectedExecution [ERROR] Failed to submit a listener notification task. Event loop shut down?
    • 1
    • 2
  • k

    Krish Narukulla

    09/10/2022, 3:58 AM
    How can I register my new factory class which extends:
    org.apache.flink.table.factories.DeserializationFormatFactory
    ? I am running into below Validation error
    Copy code
    try {
                final DynamicTableSourceFactory factory =
                        preferredFactory != null
                                ? preferredFactory
                                : discoverTableFactory(DynamicTableSourceFactory.class, context);
                return factory.createDynamicTableSource(context);
            } catch (Throwable t) {
                throw new ValidationException(
                        String.format(
                                "Unable to create a source for reading table '%s'.\n\n"
                                        + "Table options are:\n\n"
                                        + "%s",
    • 1
    • 1
  • j

    Jaromir Hamala

    09/10/2022, 2:28 PM
    Hello, is there any guide what JARs I need to download when I want to use the Parquet format from pyflink Table API? I managed to get it working by trial and error and I end-up with:
    Copy code
    flink-hadoop-compatibility_2.12-1.15.2.jar  hadoop-common-3.3.4.jar. hadoop-shaded-guava-1.1.1.jar           stax2-api-4.2.1.jar
    flink-sql-parquet-1.15.2.jar                hadoop-client-3.3.4.jar  hadoop-mapreduce-client-core-3.3.4.jar  woodstox-core-5.3.0.jar
    it works for my Hello World, but that means little. Is there any guide describing what JARs are needed for the connector to work reliably? This page lists just a single dependency https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/formats/parquet/ but obviously it’s missing the whole hadoop thingie flink-sql-parquet.jar depends on.
  • h

    Henrik Feldt

    09/11/2022, 4:02 PM
    I'm looking for a Java code sample that uses primarily Java (not SQL) to implement a lookup table from postgres; the lookup table should map a field moniker to a field id. I don't care about event-time replayability. I found https://programmer.group/six-methods-to-completely-solve-flink-table-sql-dimension-table-join.html but it seems to be outdated. And

    https://www.youtube.com/watch?v=UnCkwIp_614▾

    is with a SQL console (and I don't know how the CI/CD story of that looks like). https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/queries/joins/#regular-joins only lists SQL code and https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sourcessinks/#lookup-table-source doesn't have any code examples. https://github.com/ververica/flink-sql-cookbook/blob/main/joins/04_lookup_joins/04_lookup_joins.md is again only SQL and https://docs.immerok.cloud/docs/cookbook/joining-and-deduplicating-data/ says "this will blow out your memory". https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/data_stream_api/ only shows mapping between Stream/table APIs but not how to actually key on the id... Would love a very short snippet of how to join...
    t
    • 2
    • 45
  • s

    Sumit Nekar

    09/12/2022, 7:03 AM
    Hi Team, When we deploy a flink job in application mode using flink operator, every time we need to upgrade a job, there is a complete restart of flink cluster. So task managers might take considerable amount of time to come up. If there are jobs with higher parallelism (say 80 or 100) with more number of TMs, then There is considerable amount of downtime for the job. How has this been working for you guys in production?
    s
    • 2
    • 2
  • j

    Jirawech Siwawut

    09/12/2022, 3:33 PM
    Hi Team. I am using Hive Sink on Flink and found that checkpointing is quite slow compared to Kafka Sink. Data size is less than 100kb, but it takes more than 5 to 10 seconds to finish each checkpoint during business peak time. Is there anyway to optimize it?
  • m

    Mathi

    09/13/2022, 11:41 AM
    Can we do flink batch mode to push data from one to another Kafka topic? I used custom deserializer but nothing  working. Any idea or suggestions please. Planning to use wait topic and later to push to main topic.
    🧵 3
    m
    l
    • 3
    • 2
  • h

    Henrik Feldt

    09/13/2022, 6:48 PM
    Is it normal not to see docs in IntelliJ IDEA for Flink methods?
    ✅ 1
    m
    • 2
    • 4
  • h

    Henrik Feldt

    09/13/2022, 6:54 PM
    Does anybody here test Flink code with marble diagrams? https://rxjs.dev/guide/testing/marble-testing Follow up question: how do you test time-centric code?
    d
    • 2
    • 4
  • h

    Henrik Feldt

    09/13/2022, 8:39 PM
    What's the pattern you use for connecting graphs / jobs in Flink? Writing outputs to Kafka with the message timestamp = event time / low water mark as per the output? And then consuming them back with the same serialisers you used to output them? Or do you define lots and lots of side-outputs in a huge job?
    • 1
    • 1
  • s

    sap1ens

    09/13/2022, 9:02 PM
    Hey folks, does anyone run Flink operator with the job jar file located in the system classpath? It looks like the operator forces to specify
    jarURI
    parameter, which is passed to the Flink job as
    pipeline.jars
    . However, if you place your jar in the
    /opt/flink/lib
    location, it’ll be loaded twice: once by the system classloader and another time by the user classloader… Which leads to all kinds of exceptions. Is there any way to prevent the operator from passing
    pipeline.jars
    ?
    g
    j
    +3
    • 6
    • 37
  • s

    Sylvia Lin

    09/14/2022, 1:24 AM
    Hey folks, we're using KafkaSink interface to sink data to kafka, but we're constantly getting below kafka error
    Copy code
    Caused by: org.apache.kafka.common.KafkaException: Cannot perform send because at least one previous transactional or idempotent request has failed with errors.
    	at org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:356)
    	at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:926)
    	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865)
    	at org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:183)
    	at org.apache.flink.streaming.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:163)
    	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
    	... 33 more
    Caused by: org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker received an out of order sequence number.
    below is our kafka sink configuration, any suggestion?
    Copy code
    kafkaSinkProperties.get.setProperty(
          "client.id",
          kafkaSinkProperties.get.getProperty("client.id") + JOB_NAME
        )
        kafkaSinkProperties.get.setProperty(
          "<http://request.timeout.ms|request.timeout.ms>",
          "60000"
        )
        kafkaSinkProperties.get.setProperty(
          "<http://linger.ms|linger.ms>",
          "10"
        )
        kafkaSinkProperties.get.setProperty(
          "batch.size",
          "32768"
        )
    
        val kafkaSink = KafkaSink.builder()
          .setBootstrapServers(kafkaSinkProperties.get.getProperty("bootstrap.servers"))
          .setKafkaProducerConfig(kafkaSinkProperties.get)
          .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
          .setTransactionalIdPrefix(TXN_PREFIX)
          .setRecordSerializer(new CustomTopicSchema)
          .build
    y
    m
    • 3
    • 9
  • z

    Zhihao Chen

    09/14/2022, 5:24 AM
    👋 , I encountered a weird behaviour of Flink in Kubernetes Standalone mode with HA. the issue is the ConfigMap file
    ${cluster}-00000000000000000000000000000000-config-map
    is being appended in the unbound way. Inside the ConfigMap I can see some metadata from every checkpoint is being added to it. e.g.
    Copy code
    ...
      checkpointID-0000000000000000069: rO0ABXNyAGNvcmcuYXBhY2hlLmZsaW5rLmt1YmVybmV0ZXMuaGlnaGF2YWlsYWJpbGl0eS5LdWJlcm5ldGVzU3RhdGVIYW5kbGVTdG9yZSRTdGF0ZUhhbmRsZVdpdGhEZWxldGVNYXJrZXKfgTvzmWVDfwIAAloAEW1hcmtlZEZvckRlbGV0aW9uTAAFaW5uZXJ0ADdMb3JnL2FwYWNoZS9mbGluay9ydW50aW1lL3N0YXRlL1JldHJpZXZhYmxlU3RhdGVIYW5kbGU7eHAAc3IAO29yZy5hcGFjaGUuZmxpbmsucnVudGltZS5zdGF0ZS5SZXRyaWV2YWJsZVN0cmVhbVN0YXRlSGFuZGxlAAEeGPFVlysCAAFMABh3cmFwcGVkU3RyZWFtU3RhdGVIYW5kbGV0ADJMb3JnL2FwYWNoZS9mbGluay9ydW50aW1lL3N0YXRlL1N0cmVhbVN0YXRlSGFuZGxlO3hwc3IAOW9yZy5hcGFjaGUuZmxpbmsucnVudGltZS5zdGF0ZS5maWxlc3lzdGVtLkZpbGVTdGF0ZUhhbmRsZQTcddhivRuzAgACSgAJc3RhdGVTaXplTAAIZmlsZVBhdGh0AB9Mb3JnL2FwYWNoZS9mbGluay9jb3JlL2ZzL1BhdGg7eHAAAAAAAADLhXNyAB1vcmcuYXBhY2hlLmZsaW5rLmNvcmUuZnMuUGF0aAAAAAAAAAABAgABTAADdXJpdAAOTGphdmEvbmV0L1VSSTt4cHNyAAxqYXZhLm5ldC5VUkmsAXguQ55JqwMAAUwABnN0cmluZ3QAEkxqYXZhL2xhbmcvU3RyaW5nO3hwdABxczM6Ly9ldXJla2EtZmxpbmstZGF0YS1pbnRlcm5hbC1kZXYvemhpaGFvLXRlc3Q2LWluc3RhbmNlaWQvaGEvc3RhbmRhbG9uZS9kZWZhdWx0L2NvbXBsZXRlZENoZWNrcG9pbnQ1ZjFhMDllMzA4YmV4
      checkpointID-0000000000000000070: rO0ABXNyAGNvcmcuYXBhY2hlLmZsaW5rLmt1YmVybmV0ZXMuaGlnaGF2YWlsYWJpbGl0eS5LdWJlcm5ldGVzU3RhdGVIYW5kbGVTdG9yZSRTdGF0ZUhhbmRsZVdpdGhEZWxldGVNYXJrZXKfgTvzmWVDfwIAAloAEW1hcmtlZEZvckRlbGV0aW9uTAAFaW5uZXJ0ADdMb3JnL2FwYWNoZS9mbGluay9ydW50aW1lL3N0YXRlL1JldHJpZXZhYmxlU3RhdGVIYW5kbGU7eHAAc3IAO29yZy5hcGFjaGUuZmxpbmsucnVudGltZS5zdGF0ZS5SZXRyaWV2YWJsZVN0cmVhbVN0YXRlSGFuZGxlAAEeGPFVlysCAAFMABh3cmFwcGVkU3RyZWFtU3RhdGVIYW5kbGV0ADJMb3JnL2FwYWNoZS9mbGluay9ydW50aW1lL3N0YXRlL1N0cmVhbVN0YXRlSGFuZGxlO3hwc3IAOW9yZy5hcGFjaGUuZmxpbmsucnVudGltZS5zdGF0ZS5maWxlc3lzdGVtLkZpbGVTdGF0ZUhhbmRsZQTcddhivRuzAgACSgAJc3RhdGVTaXplTAAIZmlsZVBhdGh0AB9Mb3JnL2FwYWNoZS9mbGluay9jb3JlL2ZzL1BhdGg7eHAAAAAAAADLhXNyAB1vcmcuYXBhY2hlLmZsaW5rLmNvcmUuZnMuUGF0aAAAAAAAAAABAgABTAADdXJpdAAOTGphdmEvbmV0L1VSSTt4cHNyAAxqYXZhLm5ldC5VUkmsAXguQ55JqwMAAUwABnN0cmluZ3QAEkxqYXZhL2xhbmcvU3RyaW5nO3hwdABxczM6Ly9ldXJla2EtZmxpbmstZGF0YS1pbnRlcm5hbC1kZXYvemhpaGFvLXRlc3Q2LWluc3RhbmNlaWQvaGEvc3RhbmRhbG9uZS9kZWZhdWx0L2NvbXBsZXRlZENoZWNrcG9pbnQ0ZDQ2MTAwYTk2MGN4
      checkpointID-0000000000000000071: ...
    I feel this is unexpected as CM has the size limitation of 1MB. But not sure what I missed. Could this is due to some configuration I’ve missed? Thanks!
    c
    • 2
    • 3
  • k

    Kwangin Jung

    09/14/2022, 6:35 AM
    Hello team. I want to gzip elements in window, and sink to S3 by using StreamingFileSink. Should I use GzipStream, and use some writer for sink, or make gz file first and write as byte data, or else? Sorry I couldn't find any reference about gzip/zip file sink...
  • b

    Bilna P

    09/14/2022, 7:17 AM
    Hi All, I have a question related to python version here. I read like Flink supports Python 3.8. Does it supports Python 3.10 as well? because I see so many CVEs in Python 3.8
    c
    x
    • 3
    • 2
  • m

    Martin Egri

    09/14/2022, 7:47 AM
    Heya! I’m trying to read a bunch of Avro-files on S3 using a
    FileSource
    (so the DataStream API). The [documentation](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/filesystem/#file-source) mentions Avro several times and lets me know I should use a format but from what I can see there is nothing prepackaged I can use here?
    c
    • 2
    • 2
  • i

    Ivan M

    09/14/2022, 8:03 AM
    Hi all! We are trying to setup Google Storage as a checkpoints storage. It works fine until we get this exception, after that job stops producing new events. Did anybody meet something like that?
    Copy code
    2022-09-11 05:41:12	Caused by: java.io.IOException: Could not flush and close the file system output stream to gs://[GS path]/flink-jobs/namespaces/default/jobs/a25e17dc-bcaa-4ca4-895c-560c6b411ee1/checkpoints/a25e17dcbcaa4ca4895c560c6b411ee1/chk-7115/_metadata in order to obtain the stream state handle
    2022-09-11 05:41:12		at java.lang.Thread.run(Unknown Source) [?:?]
    2022-09-11 05:41:12		at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
    2022-09-11 05:41:12		at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
    2022-09-11 05:41:12		at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
    2022-09-11 05:41:12		at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
    2022-09-11 05:41:12		at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1133) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
    2022-09-11 05:41:12		at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1241) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
    2022-09-11 05:41:12		at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1354) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
    2022-09-11 05:41:12	org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize the pending checkpoint 7115. Failure reason: Failure to finalize checkpoint.
    2022-09-11 05:41:12	2022-09-11 03:41:12,660 WARN  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Error while processing AcknowledgeCheckpoint message
    2022-09-11 05:41:12		... 7 more
    2022-09-11 05:41:12		at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1337) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
    2022-09-11 05:41:12		at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:329) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
    2022-09-11 05:41:12		at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.closeAndFinalizeCheckpoint(FsCheckpointMetadataOutputStream.java:39) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
    2022-09-11 05:41:12		at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.closeAndFinalizeCheckpoint(FsCheckpointMetadataOutputStream.java:130) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
    2022-09-11 05:41:12		at org.apache.flink.runtime.state.filesystem.MetadataOutputStreamWrapper.closeForCommit(MetadataOutputStreamWrapper.java:63) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
    2022-09-11 05:41:12		at org.apache.flink.runtime.state.filesystem.RecoverableStreamWrapper.closeForCommitAction(RecoverableStreamWrapper.java:46) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
    2022-09-11 05:41:12		at org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.closeForCommit(GSRecoverableFsDataOutputStream.java:203) ~[?:?]
    2022-09-11 05:41:12		at org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.close(GSRecoverableFsDataOutputStream.java:196) ~[?:?]
    2022-09-11 05:41:12		at org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.closeWriteChannelIfExists(GSRecoverableFsDataOutputStream.java:240) ~[?:?]
    2022-09-11 05:41:12		at org.apache.flink.fs.gs.writer.GSChecksumWriteChannel.close(GSChecksumWriteChannel.java:99) ~[?:?]
    2022-09-11 05:41:12		at org.apache.flink.fs.gs.storage.GSBlobStorageImpl$WriteChannel.close(GSBlobStorageImpl.java:223) ~[?:?]
    2022-09-11 05:41:12		at com.google.cloud.BaseWriteChannel.close(BaseWriteChannel.java:151) ~[?:?]
    2022-09-11 05:41:12		at com.google.cloud.storage.BlobWriteChannel.flushBuffer(BlobWriteChannel.java:183) ~[?:?]
    2022-09-11 05:41:12		at com.google.cloud.RetryHelper.runWithRetries(RetryHelper.java:50) ~[?:?]
    2022-09-11 05:41:12		at com.google.cloud.RetryHelper.run(RetryHelper.java:76) ~[?:?]
    2022-09-11 05:41:12		at com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:105) ~[?:?]
    2022-09-11 05:41:12		at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
    2022-09-11 05:41:12		at com.google.cloud.storage.BlobWriteChannel$1.run(BlobWriteChannel.java:252) ~[?:?]
    2022-09-11 05:41:12		at com.google.cloud.storage.BlobWriteChannel.access$1400(BlobWriteChannel.java:34) ~[?:?]
    2022-09-11 05:41:12		at com.google.cloud.storage.BlobWriteChannel.transmitChunk(BlobWriteChannel.java:67) ~[?:?]
    2022-09-11 05:41:12		at com.google.cloud.storage.spi.v1.HttpStorageRpc.writeWithResponse(HttpStorageRpc.java:874) ~[?:?]
    2022-09-11 05:41:12		at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1012) ~[?:?]
    2022-09-11 05:41:12		at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:84) ~[?:?]
    2022-09-11 05:41:12		at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:152) ~[?:?]
    2022-09-11 05:41:12		at com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:36) ~[?:?]
    2022-09-11 05:41:12		at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(Unknown Source) ~[?:?]
    2022-09-11 05:41:12		at java.net.HttpURLConnection.getResponseCode(Unknown Source) ~[?:?]
    2022-09-11 05:41:12		at sun.net.www.protocol.http.HttpURLConnection.getInputStream(Unknown Source) ~[?:?]
    2022-09-11 05:41:12		at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(Unknown Source) ~[?:?]
    2022-09-11 05:41:12		at sun.net.www.http.HttpClient.parseHTTP(Unknown Source) ~[?:?]
    2022-09-11 05:41:12		at sun.net.www.http.HttpClient.parseHTTPHeader(Unknown Source) ~[?:?]
    2022-09-11 05:41:12	Caused by: java.net.SocketException: Unexpected end of file from server
    2022-09-11 05:41:12		... 7 more
    2022-09-11 05:41:12		at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1337) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
    2022-09-11 05:41:12		at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:329) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
    2022-09-11 05:41:12		at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.closeAndFinalizeCheckpoint(FsCheckpointMetadataOutputStream.java:39) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
    2022-09-11 05:41:12		at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.closeAndFinalizeCheckpoint(FsCheckpointMetadataOutputStream.java:130) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
    2022-09-11 05:41:12		at org.apache.flink.runtime.state.filesystem.MetadataOutputStreamWrapper.closeForCommit(MetadataOutputStreamWrapper.java:63) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
    2022-09-11 05:41:12		at org.apache.flink.runtime.state.filesystem.RecoverableStreamWrapper.closeForCommitAction(RecoverableStreamWrapper.java:46) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
    2022-09-11 05:41:12		at org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.closeForCommit(GSRecoverableFsDataOutputStream.java:203) ~[?:?]
    2022-09-11 05:41:12		at org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.close(GSRecoverableFsDataOutputStream.java:196) ~[?:?]
    2022-09-11 05:41:12		at org.apache.flink.fs.gs.writer.GSRecoverableFsDataOutputStream.closeWriteChannelIfExists(GSRecoverableFsDataOutputStream.java:240) ~[?:?]
    2022-09-11 05:41:12		at org.apache.flink.fs.gs.writer.GSChecksumWriteChannel.close(GSChecksumWriteChannel.java:99) ~[?:?]
    2022-09-11 05:41:12		at org.apache.flink.fs.gs.storage.GSBlobStorageImpl$WriteChannel.close(GSBlobStorageImpl.java:223) ~[?:?]
    2022-09-11 05:41:12		at com.google.cloud.BaseWriteChannel.close(BaseWriteChannel.java:151) ~[?:?]
    2022-09-11 05:41:12		at com.google.cloud.storage.BlobWriteChannel.flushBuffer(BlobWriteChannel.java:183) ~[?:?]
    2022-09-11 05:41:12		at com.google.cloud.RetryHelper.runWithRetries(RetryHelper.java:50) ~[?:?]
    2022-09-11 05:41:12		at com.google.cloud.RetryHelper.run(RetryHelper.java:76) ~[?:?]
    2022-09-11 05:41:12		at com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:105) ~[?:?]
    2022-09-11 05:41:12		at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
    2022-09-11 05:41:12		at com.google.cloud.storage.BlobWriteChannel$1.run(BlobWriteChannel.java:252) ~[?:?]
    2022-09-11 05:41:12		at com.google.cloud.storage.BlobWriteChannel.access$1400(BlobWriteChannel.java:34) ~[?:?]
    2022-09-11 05:41:12		at com.google.cloud.storage.BlobWriteChannel.transmitChunk(BlobWriteChannel.java:67) ~[?:?]
    2022-09-11 05:41:12		at com.google.cloud.storage.spi.v1.HttpStorageRpc.writeWithResponse(HttpStorageRpc.java:901) ~[?:?]
    2022-09-11 05:41:12		at com.google.cloud.storage.spi.v1.HttpStorageRpc.translate(HttpStorageRpc.java:233) ~[?:?]
    2022-09-11 05:41:12	Caused by: com.google.cloud.storage.StorageException: Unexpected end of file from server
    2022-09-11 05:41:12		... 7 more
    2022-09-11 05:41:12		at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1337) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
    2022-09-11 05:41:12		at org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:329) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
    2022-09-11 05:41:12		at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.closeAndFinalizeCheckpoint(FsCheckpointMetadataOutputStream.java:39) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
    2022-09-11 05:41:12		at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.closeAndFinalizeCheckpoint(FsCheckpointMetadataOutputStream.java:149) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
    2022-09-11 05:41:12	Caused by: java.io.IOException: Could not flush and close the file system output stream to gs://[GS path]/flink-jobs/namespaces/default/jobs/a25e17dc-bcaa-4ca4-895c-560c6b411ee1/checkpoints/a25e17dcbcaa4ca4895c560c6b411ee1/chk-7115/_metadata in order to obtain the stream state handle
    2022-09-11 05:41:12		at java.lang.Thread.run(Unknown Source) [?:?]
    2022-09-11 05:41:12		at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
    2022-09-11 05:41:12		at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
    2022-09-11 05:41:12		at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
    2022-09-11 05:41:12		at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
    2022-09-11 05:41:12		at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1133) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
    2022-09-11 05:41:12		at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1241) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
    2022-09-11 05:41:12		at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1346) ~[flink-dist-1.15.1-stream1.jar:1.15.1-stream1]
    2022-09-11 05:41:12	org.apache.flink.runtime.checkpoint.CheckpointException: Failure to finalize checkpoint.
    m
    • 2
    • 2
  • r

    Raghunadh Nittala

    09/14/2022, 8:29 AM
    Hi All, We're using allowedLateness of 10 minutes on top of TumblingEventTimeWindows, so that the window will wait for next watermarks to pass for 10 minutes. Now, we see that there are some late data arriving in the source and thinking to increase the lateness to 1 hour. Now, my question is, does the window state gets saved to RocksDB (as RocksDB is the stateBackend) or Flink internally caches it in heap until the lateness duration is reached? Can someone please explain this? Thanks in advance.
    c
    • 2
    • 2
  • r

    Rashmin Patel

    09/14/2022, 9:26 AM
    Hii All What is the standard way to set taskmanager related configs at job level for multiple flink jobs running in YARN application mode ? I know of following options: 1. Separate flink-conf.yaml for each job, but this adds maintainibility overhead 2. https://stackoverflow.com/questions/67401765/custom-configuration-file-flink-conf-yaml => override specific settings from the global configuration in your code, but looks like env.configure(...) allows only specific configs to override. It is not allowing taskmanager slots, memory etc. to override.
  • s

    Satya

    09/14/2022, 10:19 AM
    Hi All, I am using
    FileSource
    api to read stream of files on s3 with build config
    monitorContinuously(Duration.ofSeconds(10))
    . While starting the job I am getting a connection timeout error.
    Copy code
    org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Could not start RpcEndpoint jobmanager_3.
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:610)
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:180)
    	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    	at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
    	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
    	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
    	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
    	at akka.actor.Actor.aroundReceive(Actor.scala:517)
    	at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
    This eventually lead to connection timeout error. My guess is that issue may cause by
    FileSource.forRecordStreamFormat
    because The application will load files in batch mode to rebuild the historical data and by doing this it may cause the akka to timeout. The question is: 1. Is this issue is because of
    FileSource.forRecordStreamFormat
    ? 2. How can I increase akka.ask.timeout here?
    c
    • 2
    • 5
  • s

    Slackbot

    09/14/2022, 12:09 PM
    This message was deleted.
    c
    • 2
    • 6
  • a

    Adesh Dsilva

    09/14/2022, 1:28 PM
    Hi How do I create a TypeInfoFactory class for a set of long values? Its quite easy to do it for a list:
    Types.LIST(Types.LONG);
    but I don’t find
    Types.SET
    ?
    ✅ 1
    c
    • 2
    • 6
  • p

    Pedro Mázala

    09/14/2022, 1:44 PM
    Hey folks! I’m having issues with enabling JMX exporter on my Flink deployment. When I
    curl
    the expected JMX port I get a
    curl: (52) Empty reply from server
    but if I use
    jconsole
    I’m able to check mbeans. Have anyone else experienced it?
    Copy code
    flink-conf.yaml: |+
        (...)    
        metrics.reporters: my_jmx_reporter
        metrics.reporter.my_jmx_reporter.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory
        metrics.reporter.my_jmx_reporter.port: 9020
    
        env.java.opts: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.port=1099 -Dcom.sun.management.jmxremote.rmi.port=1099 -Djava.rmi.server.hostname=127.0.0.1
    c
    • 2
    • 7
  • n

    Nick Pocock

    09/14/2022, 4:24 PM
    Hey everyone, I am having issues trying to use a localstack Kinesis stream with Flink Statefun. I am getting the following error
    Copy code
    flink-compose-kinesis-master-1 | org.apache.flink.kinesis.shaded.com.amazonaws.SdkClientException: Unable to execute HTTP request: Certificate for <localstack> doesn't match any of the subject alternative names: [*.amplifyapp.localhost.localstack.cloud, *.cloudfront.localhost.localstack.cloud, *.execute-api.localhost.localstack.cloud, *.localhost.localstack.cloud, *.opensearch.localhost.localstack.cloud, *.s3.localhost.localstack.cloud, *.scm.localhost.localstack.cloud, localhost.localstack.cloud]
    If I try to use HTTP I get
    Caused by: java.lang.IllegalArgumentException: Invalid service endpoint url: <http://localstack:4566>; Only custom service endpoints using HTTPS are supported
    Is there any way to allow HTTP for testing purposes whilst I develop locally? Has anyone had any issues with this before? Going down a bit of a rabbit hole! If it helps my module config for the ingress looks like:
    Copy code
    kind: io.statefun.kinesis.v1/ingress
    spec:
      id: com/sentiment
      awsRegion:
        type: custom-endpoint
        id: us-east-1
        endpoint: <https://localstack:4566>
      awsCredentials:
        type: basic
        accessKeyId: key
        secretAccessKey: secret
      startupPosition:
        type: latest
      streams:
        - stream: customer-details
          valueType: SentimentEvent
          targets:
            - sentiment
      clientConfigProperties:
        - SocketTimeout: 9999
        - MaxConnections: 15
    Thanks !
    d
    • 2
    • 6
  • j

    James Timotiwu

    09/14/2022, 4:49 PM
    Hello, is there a way to submit a job without running
    .execute
    , even if it is already present in the code? I want a way to run
    getExecutionPlan
    without actually running the job, but would like to avoid rebuilding the jars even when the code already has
    execute
    ✅ 1
    c
    • 2
    • 2
1...171819...98Latest