Slackbot
02/21/2023, 5:12 AMkingsathurthi
02/21/2023, 6:13 AMAri Huttunen
02/21/2023, 7:49 AMJirawech Siwawut
02/21/2023, 8:39 AMStatementSet
to run insert query two times within one execution? I am trying to sink data to Kafka and mysql at the same time using one job.sairam yeturi
02/21/2023, 8:54 AMYang LI
02/21/2023, 9:18 AMSiddhesh Kalgaonkar
02/21/2023, 10:57 AM1.16
documentation where I saw the below screenshots. So, what does it mean? Windows is not supported by the Python API
but I can see the example in the windows section or am I missing something here? Also, I have seen that lots of operators are not supported in Python API yet as per the documentation like Window Joins, Connect, Interval Joins, Iterate, Cogroup and so on. So is it true or just not updated in the documentation? TIAMaringlen Kovaci
02/21/2023, 4:46 PMspec:
...
flinkConfiguration:
metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.factory.port: "9249"
but i need a separate configuration only for TaskManager, i need this because TaskManager will disappear after completing the batch jobs and i wont be able to scrape the metrics with prometheus, so i thought using the PrometheusPushGateway instead.
Any help would be greatly appreciatedLily Liu
02/21/2023, 4:59 PM<http://fs.gs|fs.gs>.http.read-timeout
following the doc. Has anyone faced this error before?
Caused by: org.apache.flink.util.SerializedThrowable: com.google.cloud.storage.StorageException: Read timed out
at com.google.cloud.storage.spi.v1.HttpStorageRpc.translate(HttpStorageRpc.java:233) ~[?:?]
at com.google.cloud.storage.spi.v1.HttpStorageRpc.writeWithResponse(HttpStorageRpc.java:901) ~[?:?]
at com.google.cloud.storage.BlobWriteChannel.transmitChunk(BlobWriteChannel.java:67) ~[?:?]
at com.google.cloud.storage.BlobWriteChannel.access$1400(BlobWriteChannel.java:34) ~[?:?]
at com.google.cloud.storage.BlobWriteChannel$1.run(BlobWriteChannel.java:252) ~[?:?]
Adrian Chang
02/21/2023, 5:31 PMformat = "avro-confluent"
// Sink Motion Detection to Kafka
Table motionDetectionTable = tableEnv.fromDataStream(motionDetectionStream);
motionDetectionTable.executeInsert(Util.MOTION_DETECTION_SINK_TABLE_NAME);
// Sink stream.motion messages to Redis
motionStream.addSink(new RedisSink<>(Util.getRedisConfig(), new RedisMotionMapper()));
env.execute("Motion");
When the Job is deployed, two Task Managers starts ( I have parallelism set to 1 )
One execution graph is completely contained into the other, causing calculation being duplicated and producing duplicates to Kafka.
I suspect it's because I am calling executeInsert
and env.execute
, but if I don't call them both the Job doesn't produce to both sink connectors.
Am I missing something ? ThanksPralay Ghosh
02/21/2023, 8:33 PMjava.lang.Exception: Could not perform checkpoint 79 for operator Source: test_input[1] -> Calc[2] -> ConstraintEnforcer[3] -> Sink: test_output[3] (2/12)#0.
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1138)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$11(StreamTask.java:1085)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
at java.base/java.lang.Thread.run(Thread.java:829)
Suppressed: java.lang.RuntimeException: An error occurred in ElasticsearchSink.
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:426)
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.close(ElasticsearchSinkBase.java:365)
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:114)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125)
at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:997)
at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:254)
at org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72)
at org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)
at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:916)
at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:935)
... 3 more
Caused by: java.io.IOException: Unable to parse response body for Response{requestLine=POST /_bulk?timeout=1m HTTP/1.1, host=<https://vpc-test-testCluster.us>‑west‑<http://1.es.amazonaws.com:443|1.es.amazonaws.com:443>, response=HTTP/1.1 200 OK}
at org.elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:1783)
at org.elasticsearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:636)
at org.elasticsearch.client.RestClient$1.completed(RestClient.java:376)
at org.elasticsearch.client.RestClient$1.completed(RestClient.java:370)
at org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)
at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:181)
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:448)
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:338)
at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:121)
at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
... 1 more
Caused by: java.lang.NullPointerException
at java.base/java.util.Objects.requireNonNull(Objects.java:221)
at org.elasticsearch.action.DocWriteResponse.<init>(DocWriteResponse.java:127)
at org.elasticsearch.action.update.UpdateResponse.<init>(UpdateResponse.java:65)
at org.elasticsearch.action.update.UpdateResponse$Builder.build(UpdateResponse.java:172)
at org.elasticsearch.action.update.UpdateResponse$Builder.build(UpdateResponse.java:160)
at org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:159)
at org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:188)
at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1911)
at org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$10(RestHighLevelClient.java:1699)
at org.elasticsearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:1781)
... 18 more
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 79 for operator Source: test_input[1] -> Calc[2] -> ConstraintEnforcer[3] -> Sink: test_output[3] (2/12)#0. Failure reason: Checkpoint was declined.
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:227)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:212)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:647)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:320)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$12(StreamTask.java:1253)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1241)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1126)
... 13 more
Caused by: java.lang.RuntimeException: An error occurred in ElasticsearchSink.
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkErrorAndRethrow(ElasticsearchSinkBase.java:426)
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.checkAsyncErrorsAndRequests(ElasticsearchSinkBase.java:431)
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.snapshotState(ElasticsearchSinkBase.java:339)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:87)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:222)
... 24 more
Caused by: [CIRCULAR REFERENCE: java.io.IOException: Unable to parse response body for Response{requestLine=POST /_bulk?timeout=1m HTTP/1.1, host=<https://vpc-test-testCluster.us>‑west‑<http://1.es.amazonaws.com:443|1.es.amazonaws.com:443>, response=HTTP/1.1 200 OK}]
Weirdly the index is also getting created in the elastic cluster though the flink is getting restarted for every record every time with one checkpoint failure. Another important aspect is that the underline elastic engine is AWS OpenSearch 2.x.
I know that Flink 1.16 has an explicit connector for Opensearch. Does it mean that in Flink 1.15 ElasticSearch Connector won't be compatible with an internal opensearch engine? Any pointers would be much helpful.
Many Thanks.Eric Xiao
02/21/2023, 8:39 PM2023-02-18 18:16:46
java.lang.Exception: Cannot deploy task Source: Customer Events-gke-central (46/140) (ee5b2cf7f7ff8c9c26828452de61722f_09e059c57f58864b464ee71c9fa4f9f7_45_1) - TaskManager (10.7.140.227:6122-482ecb @ 10.7.140.227 (dataPort=37725)) not responding after a rpcTimeout of 10000 ms
at org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$5(Execution.java:612)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: java.util.concurrent.TimeoutException: Invocation of [RemoteRpcInvocation(TaskExecutorGateway.submitTask(TaskDeploymentDescriptor, JobMasterId, Time))] at recipient [<akka.tcp://flink@10.7.140.227:6122/user/rpc/taskmanager_0>] timed out. This is usually caused by: 1) Akka failed sending the message silently, due to problems like oversized payload or serialization failures. In that case, you should find detailed error information in the logs. 2) The recipient needs more time for responding, due to problems like slow machines or network jitters. In that case, you can try to increase akka.ask.timeout.
at org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:59)
at org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$4(Execution.java:590)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[<akka.tcp://flink@10.7.140.227:6122/user/rpc/taskmanager_0#-244650484]]> after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
We get this error in our Flink pipeline and our JM gets into a bad state, and can only go back into a healthy state when we redeploy our pipelines again. Besides bumping up the akka.ask.timeout
is there a way we can automate the redeployment of the pipeline if the timeout is hit again?Tony Wang
02/21/2023, 10:10 PMJirawech Siwawut
02/22/2023, 3:02 AMEmily Li
02/22/2023, 4:37 AMTransient association error (association remains live)
akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[<akka.tcp://flink@10.137.128.14:50100/user/rpc/taskmanager_0#-1857979235>]: max allowed size 125829120 bytes, actual size of encoded class org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 218273805 bytes.
We tried increasing the “frame.size” Flink configuration but every time it failed with a larger size of RemoteRpcInvocation. The failure is in the SubmitTask call from the JobManager to the TaskManager.
And we also tried to revert to a version that used to be working okay before, which has now failed because of the same error.
Could you please provide some guidance on how to solve this issue? All our efforts searching for an answer have not been successful in highlighting the potential causes.Aditya
02/22/2023, 7:27 AMAditya
02/22/2023, 7:29 AMAditya
02/22/2023, 7:31 AMAri Huttunen
02/22/2023, 9:24 AMFileSystem.init()
, I'm pretty sure I could easily see it.
Alternatively if someone could tell how to configure s3 filesystem using flink-s3-fs-hadoop
, it would also solve the issue.Slackbot
02/22/2023, 9:34 AMAri Huttunen
02/22/2023, 10:39 AM[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 01:18 min
[INFO] Finished at: 2023-02-22T12:27:41+02:00
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal net.alchim31.maven:scala-maven-plugin:3.2.2:compile (scala-compile-first) on project flink-table-api-scala-bridge_2.12: wrap: org.apache.commons.exec.ExecuteException: Process exited with an error: 240 (Exit value: 240) -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] <http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException>
[ERROR]
[ERROR] After correcting the problems, you can resume the build with the command
[ERROR] mvn <args> -rf :flink-table-api-scala-bridge_2.12
Kwangin Jung
02/22/2023, 10:44 AM...java.lang.NoSuchMethodError: 'void org.apache.orc.impl.OutStream.<init>(java.lang.String, int, org.apache.orc.CompressionCodec, org.apache.orc.PhysicalWriter$OutputReceiver)'
at org.apache.flink.orc.writer.PhysicalWriterImpl.<init>(PhysicalWriterImpl.java:101)
at org.apache.flink.orc.writer.OrcBulkWriterFactory.create(OrcBulkWriterFactory.java:99)
at org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNew(BulkBucketWriter.java:76)
at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:124)
at org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNewInProgressFile(BulkBucketWriter.java:36)
at org.apache.flink.connector.file.sink.writer.FileWriterBucket.rollPartFile(FileWriterBucket.java:261)
at org.apache.flink.connector.file.sink.writer.FileWriterBucket.write(FileWriterBucket.java:188)
at org.apache.flink.connector.file.sink.writer.FileWriter.write(FileWriter.java:198)
Dima Sheludko
02/22/2023, 2:00 PMCannot create recoverable writer due to This s3 file system implementation does not support recoverable writers., will use the ordinary writer.
as far as I can see, flink has successfully created a checkpoint, but these logs still appear.
i am using presto and s3p:// format and have copied the Presto JAR to the plugins and lib directories of my Flink deployment.Jagan Nalla
02/22/2023, 5:31 PMPradeep Ramachandra
02/22/2023, 5:51 PMpublic class JoinJob {
// create environments of both APIs
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
env.setParallelism(4);
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.createTemporaryView("user",TargetDataReader.convertSourceToTable(FILE_PATH_USER, env, tableEnv));
tableEnv.createTemporaryView("account",AccountReader.convertSourceToTable(FILE_PATH_ACCOUNT, env, tableEnv));
Table resultTable = tableEnv
.sqlQuery("select u.id, u.name, u.email, "
+ a.account_name as account_name, a.type as account_type"
+ from user u left join account a on u.id = a.user_id");
DataStream<Row> resultStream = tableEnv.toDataStream(resultTable);
resultStream.print();
env.execute("Executing flink test job");
}
- - code to read account information from CSV (also there is a similar class to read user information) - -
public class AccountReader {
public static Table convertSourceToTable(String path, StreamExecutionEnvironment env,
StreamTableEnvironment tableEnv) {
FileSource<String> accountSrc = FileSource.forRecordStreamFormat(
new TextLineInputFormat("UTF-8"),
new Path(path))
//monitors folder to read new files. Every second it checks if folder has more files to read
.monitorContinuously(Duration.ofSeconds(1))
.setSplitAssigner(FileSource.DEFAULT_SPLIT_ASSIGNER)
//This is to decide if we need to tell flink to split file and read in chunks. archived files are not splittable.
.setFileEnumerator((Provider) () -> new NonSplittingRecursiveEnumerator())
.build();
DataStream< Account2> accountStream = env.fromSource(accountSrc,
WatermarkStrategy.forMonotonousTimestamps(),
"Account as source")
.flatMap(new FlatMapFunction<String, Account2>() {
@Override
public void flatMap(String value, Collector<Account2> out) throws Exception {
String[] splittedString = value.split(",");
Account2 account = new Account2();
account.setType(Integer.parseInt(splittedString[0]));
account.setName(splittedString[1]);
account.setUserId(splittedString[2]);
out.collect(account);
}
});
return tableEnv.fromDataStream(accountStream).as("type", "account_name");
}
}
- - error - -
Exception in thread "main" org.apache.flink.table.api.TableException: Table sink '*anonymous_datastream_sink$4*' doesn't support consuming update and delete changes which is produced by node Join(joinType=[LeftOuterJoin], where=[(id = user_id)], select=[id, name, email], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:406)
sharad mishra
02/22/2023, 6:41 PMCaused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint <hdfs://blackhole-auditlog/data/dcn-log-parser/checkpointing/755c50f2ddfbed68607f7c6ad2428c4c/chk-1493>. Cannot map checkpoint/savepoint state for operator cbc357ccb763df2852fee8c4fc7d55f2 to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI.
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
... 3 more
Caused by: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint <hdfs://blackhole-auditlog/data/dcn-log-parser/checkpointing/755c50f2ddfbed68607f7c6ad2428c4c/chk-1493>. Cannot map checkpoint/savepoint state for operator cbc357ccb763df2852fee8c4fc7d55f2 to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI.
at org.apache.flink.runtime.checkpoint.Checkpoints.throwNonRestoredStateException(Checkpoints.java:238)
at org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:197)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1798)
at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.tryRestoreExecutionGraphFromSavepoint(DefaultExecutionGraphFactory.java:214)
at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:189)
at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:361)
at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:206)
at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:134)
at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:152)
at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:119)
at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:369)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:346)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:123)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
I know I can restore using my application using option --allowNonRestoredState
or -n
.
My question is if I use --allowNonRestoredState
option, do I loose the state(application state (keyed state)) of my application for the operators that were non-restorable?Dima Sheludko
02/22/2023, 7:14 PMOren Itamar
02/22/2023, 9:51 PMFred Wu
02/23/2023, 12:04 AMMatt Fysh
02/23/2023, 12:43 AMflink-connector-kinesis-1.15.2.jar
The job starts successfully but immediately runs into this root exception:
java.lang.NoClassDefFoundError: org/apache/commons/logging/LogFactory