Hi team, I am implementing flink job reading parqu...
# troubleshooting
l
Hi team, I am implementing flink job reading parquet files from s3 path and output to kafka topic, and encountered this error, can I get some help on this?
Copy code
org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: file-source -> Sink: Writer -> Sink: Committer' (operator cbc357ccb763df2852fee8c4fc7d55f2).
        at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:600)
        at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:237)
        at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:374)
        at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:226)
        at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.resetAndStart(RecreateOnResetOperatorCoordinator.java:405)
        at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.lambda$resetToCheckpoint$6(RecreateOnResetOperatorCoordinator.java:150)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
        at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:883)
        at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2251)
        at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.resetToCheckpoint(RecreateOnResetOperatorCoordinator.java:137)
        at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.resetToCheckpoint(OperatorCoordinatorHolder.java:280)
        at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreStateToCoordinators(CheckpointCoordinator.java:2045)
        at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1720)
        at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1648)
        at org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:426)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.restartTasks(DefaultScheduler.java:389)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$null$2(DefaultScheduler.java:349)
        at java.base/java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:783)
        at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
        at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
        at akka.actor.Actor.aroundReceive(Actor.scala:537)
        at akka.actor.Actor.aroundReceive$(Actor.scala:535)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
        at akka.actor.ActorCell.invoke(ActorCell.scala:547)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
        at akka.dispatch.Mailbox.run(Mailbox.scala:231)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
        at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
        at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: org.apache.flink.util.FlinkRuntimeException: Could not enumerate file splits
        at org.apache.flink.connector.file.src.AbstractFileSource.createEnumerator(AbstractFileSource.java:143)
        at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:222)
        ... 42 more
Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: getFileStatus on <s3://nonprod-sessionizer-connector-storage/staging/customer-session-analytical-staging-avro/year=2023/month=08/day=31/hour=22>: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: RM0PDKFZ48TT1QGP; S3 Extended Request ID: EzbywJvkFBbmfnn167B/Hr8LxZ8FqpbiHpkOpfXnn51TJHPEg9I5tio6xalW9QYUan80u4HG67k=; Proxy: null), S3 Extended Request ID: EzbywJvkFBbmfnn167B/Hr8LxZ8FqpbiHpkOpfXnn51TJHPEg9I5tio6xalW9QYUan80u4HG67k=:400 Bad Request: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: RM0PDKFZ48TT1QGP; S3 Extended Request ID: EzbywJvkFBbmfnn167B/Hr8LxZ8FqpbiHpkOpfXnn51TJHPEg9I5tio6xalW9QYUan80u4HG67k=; Proxy: null)
        at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:249)
        at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:175)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3796)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getFileStatus$24(S3AFileSystem.java:3556)
        at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
        at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:3554)
        at org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:85)
        at org.apache.flink.connector.file.src.enumerate.NonSplittingRecursiveEnumerator.enumerateSplits(NonSplittingRecursiveEnumerator.java:83)
        at org.apache.flink.connector.file.src.AbstractFileSource.createEnumerator(AbstractFileSource.java:141)
        ... 43 more
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: RM0PDKFZ48TT1QGP; S3 Extended Request ID: EzbywJvkFBbmfnn167B/Hr8LxZ8FqpbiHpkOpfXnn51TJHPEg9I5tio6xalW9QYUan80u4HG67k=; Proxy: null), S3 Extended Request ID: EzbywJvkFBbmfnn167B/Hr8LxZ8FqpbiHpkOpfXnn51TJHPEg9I5tio6xalW9QYUan80u4HG67k=
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1879)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1418)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1387)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1157)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:814)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:781)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:755)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:715)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:697)
        at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:561)
        at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:541)
        at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5520)
        at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5467)
        at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1402)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getObjectMetadata$10(S3AFileSystem.java:2545)
        at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:414)
        at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:377)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2533)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2513)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3776)
        ... 53 more