https://flink.apache.org/ logo
Join SlackCommunities
Powered by
# troubleshooting
  • s

    Slackbot

    02/21/2023, 5:12 AM
    This message was deleted.
    s
    c
    • 3
    • 4
  • k

    kingsathurthi

    02/21/2023, 6:13 AM
    Hi All, Flink operator 1.3.1 question: How can i add readiness probe(TCP socket) only for taskmanager pod?
  • a

    Ari Huttunen

    02/21/2023, 7:49 AM
    What jars should Flink need when reading from / writing to parquet files on S3 using SQL / table API? Code is written in pyflink.
    • 1
    • 2
  • j

    Jirawech Siwawut

    02/21/2023, 8:39 AM
    Hi all. I got some stupid question about Flink SQL. Is it possible to use
    StatementSet
    to run insert query two times within one execution? I am trying to sink data to Kafka and mysql at the same time using one job.
    m
    • 2
    • 6
  • s

    sairam yeturi

    02/21/2023, 8:54 AM
    Hi all, is it possible to submit a job change / upgrade a Kafka job (when submitting to flink) have zero downtime in some form? Any thoughts how people handle this today?
    m
    s
    • 3
    • 10
  • y

    Yang LI

    02/21/2023, 9:18 AM
    Hello guys, Have anyone experience the _metadata size increase issue after flink migration? is there a solution or conf to fix it? 🙏
    âž• 1
    m
    • 2
    • 3
  • s

    Siddhesh Kalgaonkar

    02/21/2023, 10:57 AM
    Hello #C03G7LJTS2G I am going through the
    1.16
    documentation where I saw the below screenshots. So, what does it mean? Windows is not supported by the
    Python API
    but I can see the example in the windows section or am I missing something here? Also, I have seen that lots of operators are not supported in Python API yet as per the documentation like Window Joins, Connect, Interval Joins, Iterate, Cogroup and so on. So is it true or just not updated in the documentation? TIA
    d
    • 2
    • 5
  • m

    Maringlen Kovaci

    02/21/2023, 4:46 PM
    Hi all, is it possible to use a different metric.reporter.* configuration for TaskManagers in FlinkKubernetesOperator? I know you can configure it for both JobManager and TaskManager in spec.flinkConfiguration:
    Copy code
    spec:
    ...
      flinkConfiguration:
        metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
        metrics.reporter.prom.factory.port: "9249"
    but i need a separate configuration only for TaskManager, i need this because TaskManager will disappear after completing the batch jobs and i wont be able to scrape the metrics with prometheus, so i thought using the PrometheusPushGateway instead. Any help would be greatly appreciated
  • l

    Lily Liu

    02/21/2023, 4:59 PM
    Hello. I am facing below error when I try to sink to gcs bucket: I have increased
    <http://fs.gs|fs.gs>.http.read-timeout
    following the doc. Has anyone faced this error before?
    Copy code
    Caused by: org.apache.flink.util.SerializedThrowable: com.google.cloud.storage.StorageException: Read timed out
    	at com.google.cloud.storage.spi.v1.HttpStorageRpc.translate(HttpStorageRpc.java:233) ~[?:?]
    	at com.google.cloud.storage.spi.v1.HttpStorageRpc.writeWithResponse(HttpStorageRpc.java:901) ~[?:?]
    	at com.google.cloud.storage.BlobWriteChannel.transmitChunk(BlobWriteChannel.java:67) ~[?:?]
    	at com.google.cloud.storage.BlobWriteChannel.access$1400(BlobWriteChannel.java:34) ~[?:?]
    	at com.google.cloud.storage.BlobWriteChannel$1.run(BlobWriteChannel.java:252) ~[?:?]
  • a

    Adrian Chang

    02/21/2023, 5:31 PM
    Hello I have a dummy question, I have this Job which sink to Kafka and Redis. I am using Table API for Kafka connector because I need to use
    format = "avro-confluent"
    Copy code
    // Sink Motion Detection to Kafka
    Table motionDetectionTable = tableEnv.fromDataStream(motionDetectionStream);
    motionDetectionTable.executeInsert(Util.MOTION_DETECTION_SINK_TABLE_NAME);
    
    // Sink stream.motion messages to Redis
    motionStream.addSink(new RedisSink<>(Util.getRedisConfig(), new RedisMotionMapper()));
    
    env.execute("Motion");
    When the Job is deployed, two Task Managers starts ( I have parallelism set to 1 ) One execution graph is completely contained into the other, causing calculation being duplicated and producing duplicates to Kafka. I suspect it's because I am calling
    executeInsert
    and
    env.execute
    , but if I don't call them both the Job doesn't produce to both sink connectors. Am I missing something ? Thanks
    • 1
    • 1
  • p

    Pralay Ghosh

    02/21/2023, 8:33 PM
    Hello, I am facing one issue with ElasticSearch connector in Flink 1.15. I am using table api for this connector. The job is restarting for each index creation and giving the below error.
    Copy code
    java.lang.Exception: Could not perform checkpoint 79 for operator Source: test_input[1] -> Calc[2] -> ConstraintEnforcer[3] -> Sink: test_output[3] (2/12)#0.
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1138)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$11(StreamTask.java:1085)
    	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    	at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
    	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
    	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
    	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
    	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
    	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
    	at java.base/java.lang.Thread.run(Thread.java:829)
    	Suppressed: java.lang.RuntimeException: An error occurred in ElasticsearchSink.
    		at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:426)
    		at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:365)
    		at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
    		at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:114)
    		at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163)
    		at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125)
    		at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:997)
    		at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:254)
    		at org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72)
    		at org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)
    		at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:916)
    		at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:935)
    		at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
    		at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:935)
    		... 3 more
    	Caused by: java.io.IOException: Unable to parse response body for Response{requestLine=POST /_bulk?timeout=1m HTTP/1.1, host=<https://vpc-test-testCluster.us>‑west‑<http://1.es.amazonaws.com:443|1.es.amazonaws.com:443>, response=HTTP/1.1 200 OK}
    		at org.elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:1783)
    		at org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:636)
    		at org.elasticsearch.client.RestClient$1.completed(RestClient.java:376)
    		at org.elasticsearch.client.RestClient$1.completed(RestClient.java:370)
    		at org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)
    		at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:181)
    		at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:448)
    		at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:338)
    		at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
    		at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
    		at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
    		at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:121)
    		at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
    		at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
    		at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
    		at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
    		at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
    		at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
    		... 1 more
    	Caused by: java.lang.NullPointerException
    		at java.base/java.util.Objects.requireNonNull(Objects.java:221)
    		at org.elasticsearch.action.DocWriteResponse.<init>(DocWriteResponse.java:127)
    		at org.elasticsearch.action.update.UpdateResponse.<init>(UpdateResponse.java:65)
    		at org.elasticsearch.action.update.UpdateResponse$Builder.build(UpdateResponse.java:172)
    		at org.elasticsearch.action.update.UpdateResponse$Builder.build(UpdateResponse.java:160)
    		at org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:159)
    		at org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:188)
    		at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1911)
    		at org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$10(RestHighLevelClient.java:1699)
    		at org.elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:1781)
    		... 18 more
    Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 79 for operator Source: test_input[1] -> Calc[2] -> ConstraintEnforcer[3] -> Sink: test_output[3] (2/12)#0. Failure reason: Checkpoint was declined.
    	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269)
    	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173)
    	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)
    	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:227)
    	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:212)
    	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
    	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:647)
    	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:320)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$12(StreamTask.java:1253)
    	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1241)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1126)
    	... 13 more
    Caused by: java.lang.RuntimeException: An error occurred in ElasticsearchSink.
    	at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:426)
    	at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:431)
    	at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.snapshotState(ElasticsearchSinkBase.java:339)
    	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
    	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
    	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:87)
    	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:222)
    	... 24 more
    Caused by: [CIRCULAR REFERENCE: java.io.IOException: Unable to parse response body for Response{requestLine=POST /_bulk?timeout=1m HTTP/1.1, host=<https://vpc-test-testCluster.us>‑west‑<http://1.es.amazonaws.com:443|1.es.amazonaws.com:443>, response=HTTP/1.1 200 OK}]
    Weirdly the index is also getting created in the elastic cluster though the flink is getting restarted for every record every time with one checkpoint failure. Another important aspect is that the underline elastic engine is AWS OpenSearch 2.x. I know that Flink 1.16 has an explicit connector for Opensearch. Does it mean that in Flink 1.15 ElasticSearch Connector won't be compatible with an internal opensearch engine? Any pointers would be much helpful. Many Thanks.
    m
    • 2
    • 1
  • e

    Eric Xiao

    02/21/2023, 8:39 PM
    Copy code
    2023-02-18 18:16:46
    java.lang.Exception: Cannot deploy task Source: Customer Events-gke-central (46/140) (ee5b2cf7f7ff8c9c26828452de61722f_09e059c57f58864b464ee71c9fa4f9f7_45_1) - TaskManager (10.7.140.227:6122-482ecb @ 10.7.140.227 (dataPort=37725)) not responding after a rpcTimeout of 10000 ms
    	at org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$5(Execution.java:612)
    	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
    	at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
    	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
    	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
    	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
    	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
    	at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
    	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    	at akka.actor.Actor.aroundReceive(Actor.scala:537)
    	at akka.actor.Actor.aroundReceive$(Actor.scala:535)
    	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
    	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
    	at akka.actor.ActorCell.invoke(ActorCell.scala:548)
    	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
    	at akka.dispatch.Mailbox.run(Mailbox.scala:231)
    	at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
    	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
    	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
    	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
    	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
    	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
    Caused by: java.util.concurrent.TimeoutException: Invocation of [RemoteRpcInvocation(TaskExecutorGateway.submitTask(TaskDeploymentDescriptor, JobMasterId, Time))] at recipient [<akka.tcp://flink@10.7.140.227:6122/user/rpc/taskmanager_0>] timed out. This is usually caused by: 1) Akka failed sending the message silently, due to problems like oversized payload or serialization failures. In that case, you should find detailed error information in the logs. 2) The recipient needs more time for responding, due to problems like slow machines or network jitters. In that case, you can try to increase akka.ask.timeout.
    	at org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:59)
    	at org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$4(Execution.java:590)
    	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
    	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    	at java.base/java.lang.Thread.run(Thread.java:829)
    Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[<akka.tcp://flink@10.7.140.227:6122/user/rpc/taskmanager_0#-244650484]]> after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
    We get this error in our Flink pipeline and our JM gets into a bad state, and can only go back into a healthy state when we redeploy our pipelines again. Besides bumping up the
    akka.ask.timeout
    is there a way we can automate the redeployment of the pipeline if the timeout is hit again?
  • t

    Tony Wang

    02/21/2023, 10:10 PM
    Is it possible to specify an order for Flink to read through a directory of Parquet files?
    s
    • 2
    • 9
  • j

    Jirawech Siwawut

    02/22/2023, 3:02 AM
    Hi. Could you guide me to proper channel or website to report some issue i found. I have a java script to reproduce this behavior and i do not want to be in the wrong channel. It is related to previous issue i asked in this channel. Sorry if i am in the wrong channel. https://apache-flink.slack.com/archives/C03G7LJTS2G/p1676968781999069
    m
    • 2
    • 11
  • e

    Emily Li

    02/22/2023, 4:37 AM
    Hey, We recently noticed a Flink application (1.15.0) that was running successfully for a few months that started failing with:
    Copy code
    Transient association error (association remains live)
    akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[<akka.tcp://flink@10.137.128.14:50100/user/rpc/taskmanager_0#-1857979235>]: max allowed size 125829120 bytes, actual size of encoded class org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 218273805 bytes.
    We tried increasing the “frame.size” Flink configuration but every time it failed with a larger size of RemoteRpcInvocation. The failure is in the SubmitTask call from the JobManager to the TaskManager. And we also tried to revert to a version that used to be working okay before, which has now failed because of the same error. Could you please provide some guidance on how to solve this issue? All our efforts searching for an answer have not been successful in highlighting the potential causes.
    t
    • 2
    • 3
  • a

    Aditya

    02/22/2023, 7:27 AM
    đź‘‹ Hello, team!
  • a

    Aditya

    02/22/2023, 7:29 AM
    I've started looking into Flink for couple of our usecases. I'm particularly interested in dynamic tables. I was wondering if we can set retention policy (ttl) at table level and not at table environment level.
  • a

    Aditya

    02/22/2023, 7:31 AM
    Wanted to set different ttl for different tables in same table environment. Is there anyway to achieve this? If not, any suggestion for similar usecase?
    d
    • 2
    • 10
  • a

    Ari Huttunen

    02/22/2023, 9:24 AM
    Are there some instructions on how to use pyflink, but hit a breakpoint in Java code? I have some configuration problem, probably, that prevents creating `FileSystemFactory`s, and if I could debug what goes on in
    FileSystem.init()
    , I'm pretty sure I could easily see it. Alternatively if someone could tell how to configure s3 filesystem using
    flink-s3-fs-hadoop
    , it would also solve the issue.
  • s

    Slackbot

    02/22/2023, 9:34 AM
    This message was deleted.
    m
    • 2
    • 3
  • a

    Ari Huttunen

    02/22/2023, 10:39 AM
    I tried building Flink on my Mac, and I got this. Does this mean I am using the wrong version of java to build it, or what? It's the tag for 1.16.0.
    Copy code
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD FAILURE
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time:  01:18 min
    [INFO] Finished at: 2023-02-22T12:27:41+02:00
    [INFO] ------------------------------------------------------------------------
    [ERROR] Failed to execute goal net.alchim31.maven:scala-maven-plugin:3.2.2:compile (scala-compile-first) on project flink-table-api-scala-bridge_2.12: wrap: org.apache.commons.exec.ExecuteException: Process exited with an error: 240 (Exit value: 240) -> [Help 1]
    [ERROR] 
    [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
    [ERROR] Re-run Maven using the -X switch to enable full debug logging.
    [ERROR] 
    [ERROR] For more information about the errors and possible solutions, please read the following articles:
    [ERROR] [Help 1] <http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException>
    [ERROR] 
    [ERROR] After correcting the problems, you can resume the build with the command
    [ERROR]   mvn <args> -rf :flink-table-api-scala-bridge_2.12
  • k

    Kwangin Jung

    02/22/2023, 10:44 AM
    Hello. Does anyone tried to use FileSink to sink data in ORC format? https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/datastream/filesystem/#orc-format I'm following the process here, but it shows error as below
    Copy code
    ...java.lang.NoSuchMethodError: 'void org.apache.orc.impl.OutStream.<init>(java.lang.String, int, org.apache.orc.CompressionCodec, org.apache.orc.PhysicalWriter$OutputReceiver)'
    	at org.apache.flink.orc.writer.PhysicalWriterImpl.<init>(PhysicalWriterImpl.java:101)
    	at org.apache.flink.orc.writer.OrcBulkWriterFactory.create(OrcBulkWriterFactory.java:99)
    	at org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNew(BulkBucketWriter.java:76)
    	at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:124)
    	at org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNewInProgressFile(BulkBucketWriter.java:36)
    	at org.apache.flink.connector.file.sink.writer.FileWriterBucket.rollPartFile(FileWriterBucket.java:261)
    	at org.apache.flink.connector.file.sink.writer.FileWriterBucket.write(FileWriterBucket.java:188)
    	at org.apache.flink.connector.file.sink.writer.FileWriter.write(FileWriter.java:198)
  • d

    Dima Sheludko

    02/22/2023, 2:00 PM
    hello, has anyone faced the issue?
    Copy code
    Cannot create recoverable writer due to This s3 file system implementation does not support recoverable writers., will use the ordinary writer.
    as far as I can see, flink has successfully created a checkpoint, but these logs still appear. i am using presto and s3p:// format and have copied the Presto JAR to the plugins and lib directories of my Flink deployment.
    s
    m
    • 3
    • 10
  • j

    Jagan Nalla

    02/22/2023, 5:31 PM
    I am not sure how to resolve this error? Hope someone can help me here to fix. Just run this code which I found in pyflink documentation. from pyflink.table import EnvironmentSettings, TableEnvironment from pyflink.table import DataTypes from pyflink.table.udf import udf import pandas as pd # using batch table environment to execute the queries env_settings = EnvironmentSettings.in_batch_mode() table_env = TableEnvironment.create(env_settings) orders = table_env.from_elements([('Jack', 'FRANCE', 10), ('Rose', 'ENGLAND', 30), ('Jack', 'FRANCE', 20)], ['name', 'country', 'revenue']) map_function = udf(lambda x: pd.concat([x.name, x.revenue * 10], axis=1), result_type=DataTypes.ROW( [DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("revenue", DataTypes.BIGINT())]), func_type="pandas") orders.map(map_function).execute().print()
    error_for_dev_json_format.txt
    d
    • 2
    • 4
  • p

    Pradeep Ramachandra

    02/22/2023, 5:51 PM
    Hi Everyone, I have a Flink job that reads data from CSV file that will be stored in S3. However, for testing purpose I have local copies of the source data. The same program if switched from RuntimeExecutionMode.STREAMING - to -> RuntimeExecutionMode.BATCH and commenting out line //.monitorContinuously(Duration._ofSeconds_(1)) in AccountReader will work fine. Any thoughts on what am doing wrong, and what am missing to get it running in streaming mode? The program looks like -
    Copy code
    public class JoinJob {
    // create environments of both APIs
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
    env.setParallelism(4);
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
    tableEnv.createTemporaryView("user",TargetDataReader.convertSourceToTable(FILE_PATH_USER, env, tableEnv));
    tableEnv.createTemporaryView("account",AccountReader.convertSourceToTable(FILE_PATH_ACCOUNT, env, tableEnv));
    
    Table resultTable = tableEnv
      .sqlQuery("select u.id, u.name, u.email, "
        + a.account_name as account_name, a.type as account_type"
        + from user u left join account a on u.id = a.user_id");
    
    DataStream<Row> resultStream = tableEnv.toDataStream(resultTable);
    resultStream.print();
    
    env.execute("Executing flink test job");
    }
    
    - - code to read account information from CSV (also there is a similar class to read user information) - - 
    public class AccountReader {
    
     public static Table convertSourceToTable(String path, StreamExecutionEnvironment env,
       StreamTableEnvironment tableEnv) {
    
      FileSource<String> accountSrc = FileSource.forRecordStreamFormat(
          new TextLineInputFormat("UTF-8"),
          new Path(path))
        //monitors folder to read new files. Every second it checks if folder has more files to read
        .monitorContinuously(Duration.ofSeconds(1))
        .setSplitAssigner(FileSource.DEFAULT_SPLIT_ASSIGNER)
    
        //This is to decide if we need to tell flink to split file and read in chunks. archived files are not splittable.
        .setFileEnumerator((Provider) () -> new NonSplittingRecursiveEnumerator())
        .build();
    
      DataStream< Account2> accountStream = env.fromSource(accountSrc,
          WatermarkStrategy.forMonotonousTimestamps(),
          "Account as source")
        .flatMap(new FlatMapFunction<String, Account2>() {
         @Override
         public void flatMap(String value, Collector<Account2> out) throws Exception {
          String[] splittedString = value.split(",");
          Account2 account = new Account2();
          account.setType(Integer.parseInt(splittedString[0]));
          account.setName(splittedString[1]);
    			 account.setUserId(splittedString[2]);
          out.collect(account);
         }
        });
    
      return tableEnv.fromDataStream(accountStream).as("type", "account_name");
     }
    }
    - - error - -
    Copy code
    Exception in thread "main" org.apache.flink.table.api.TableException: Table sink '*anonymous_datastream_sink$4*' doesn't support consuming update and delete changes which is produced by node Join(joinType=[LeftOuterJoin], where=[(id = user_id)], select=[id, name, email], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
    	at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:406)
    m
    j
    • 3
    • 9
  • s

    sharad mishra

    02/22/2023, 6:41 PM
    Hello Team, I have question around flink checkpoint. when I downscale my application (e.g. parallelism 4 ->2) and try to restore the application from previous checkpoint with new parallelism, I get following exception:
    Copy code
    Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint <hdfs://blackhole-auditlog/data/dcn-log-parser/checkpointing/755c50f2ddfbed68607f7c6ad2428c4c/chk-1493>. Cannot map checkpoint/savepoint state for operator cbc357ccb763df2852fee8c4fc7d55f2 to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI.
    	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
    	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
    	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
    	... 3 more
    Caused by: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint <hdfs://blackhole-auditlog/data/dcn-log-parser/checkpointing/755c50f2ddfbed68607f7c6ad2428c4c/chk-1493>. Cannot map checkpoint/savepoint state for operator cbc357ccb763df2852fee8c4fc7d55f2 to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI.
    	at org.apache.flink.runtime.checkpoint.Checkpoints.throwNonRestoredStateException(Checkpoints.java:238)
    	at org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:197)
    	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1798)
    	at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.tryRestoreExecutionGraphFromSavepoint(DefaultExecutionGraphFactory.java:214)
    	at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:189)
    	at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:361)
    	at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:206)
    	at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:134)
    	at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:152)
    	at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:119)
    	at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:369)
    	at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:346)
    	at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:123)
    	at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
    	at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
    	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
    I know I can restore using my application using option
    --allowNonRestoredState
    or
    -n
    . My question is if I use
    --allowNonRestoredState
    option, do I loose the state(application state (keyed state)) of my application for the operators that were non-restorable?
    s
    m
    • 3
    • 4
  • d

    Dima Sheludko

    02/22/2023, 7:14 PM
    hello đź‘‹ it possible to enable the creation of savepoints in the Flink web UI?
    m
    • 2
    • 2
  • o

    Oren Itamar

    02/22/2023, 9:51 PM
    Hello, We are currently working on migrating a few Flink apps from KDA to Kubernetes. I was wondering what's the way to go with a local dir for rocksdb - Should a Persistent Volume be used (EBS in our case), or will an emptyDir do the trick as well? I'm trying to understand whether in the case of a TM pod loss, a new pod will be able to use the data stored in the local dir of the previous one, assuming it is stored on a persistent volume.
    • 1
    • 1
  • f

    Fred Wu

    02/23/2023, 12:04 AM
    Hi guys, is it possible to create a KeyedStream from a JoinedStream?
  • m

    Matt Fysh

    02/23/2023, 12:43 AM
    Hi all, I’m trying to add
    flink-connector-kinesis-1.15.2.jar
    The job starts successfully but immediately runs into this root exception:
    java.lang.NoClassDefFoundError: org/apache/commons/logging/LogFactory
    d
    • 2
    • 5
1...585960...98Latest