Hangyu Wang
05/31/2023, 10:08 AMCsvReaderFormat<MetricEvent> csvFormat = CsvReaderFormat.forPojo(Metric.class);
FileSource<MetricEvent> source = FileSource.forRecordStreamFormat(csvFormat, new Path("<s3://test-dev/test.csv>")).build();
DataStream<Metric> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Metric");
According to the log, the s3 access-key and secrete key has been successfully loaded, but I still got an error showing that 403 forbidden error. What am I missing?
org.apache.flink.util.FlinkRuntimeException: Could not enumerate file splits
at org.apache.flink.connector.file.src.AbstractFileSource.createEnumerator(AbstractFileSource.java:143) ~[flink-connector-files-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:213) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:315) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.start(RecreateOnResetOperatorCoordinator.java:70) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:181) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:165) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:82) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:615) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:1044) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:961) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:424) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:198) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.lambda$start$0(AkkaRpcActor.java:622) ~[flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:621) ~[flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:190) ~[flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
at akka.actor.Actor.aroundReceive(Actor.scala:537) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
at akka.actor.Actor.aroundReceive$(Actor.scala:535) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
at akka.actor.ActorCell.invoke(ActorCell.scala:547) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
at akka.dispatch.Mailbox.run(Mailbox.scala:231) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?]
at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) [?:?]
at java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?]
at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) [?:?]
at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) [?:?]
Caused by: java.nio.file.AccessDeniedException: <s3://test-dev/test.csv>: getFileStatus on <s3://test-dev/test.csv>: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: WZ0MBF6DFKQ2GT21; S3 Extended Request ID: WbDaX6osQkhvdkcwPQe0FuyXj2f3ESqwerQupUrypbgc/eDpX5fVonNTarRfUm/RPZ/lxnHRpts=; Proxy: null), S3 Extended Request ID: WbDaX6osQkhvdkcwPQe0FuyXj2f3ESqwerQupUrypbgc/eDpX5fVonNTarRfUm/RPZ/lxnHRpts=:403 Forbidden
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:255) ~[?:?]
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:175) ~[?:?]
Hangyu Wang
05/31/2023, 10:12 AM60
2023-05-31 09:58:55,953 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: s3.access-key-id, xxxxxxxxx
...
2023-05-31 09:58:55,954 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: s3.secret-access-key, ******
2023-05-31 09:58:55,954 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: execution.attached, true
2023-05-31
Gaurav Miglani
05/31/2023, 10:21 AMs3a.access-key-id
and s3.secret-access-key
Gaurav Miglani
05/31/2023, 10:21 AMMartijn Visser
05/31/2023, 10:25 AMHangyu Wang
05/31/2023, 10:50 AMHangyu Wang
05/31/2023, 10:56 AMMartijn Visser
05/31/2023, 12:10 PMHangyu Wang
06/01/2023, 1:42 AMMartijn Visser
06/01/2023, 4:33 AMHangyu Wang
06/01/2023, 6:47 AMMartijn Visser
06/01/2023, 6:50 AMHangyu Wang
06/01/2023, 6:54 AMMartijn Visser
06/01/2023, 5:10 PM