Hi all. My purpose is reading csv file from s3 and...
# troubleshooting
h
Hi all. My purpose is reading csv file from s3 and to do some aggregation in flink.
Copy code
CsvReaderFormat<MetricEvent> csvFormat = CsvReaderFormat.forPojo(Metric.class);
        FileSource<MetricEvent> source = FileSource.forRecordStreamFormat(csvFormat, new Path("<s3://test-dev/test.csv>")).build();
        DataStream<Metric> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Metric");
According to the log, the s3 access-key and secrete key has been successfully loaded, but I still got an error showing that 403 forbidden error. What am I missing?
Copy code
org.apache.flink.util.FlinkRuntimeException: Could not enumerate file splits
	at org.apache.flink.connector.file.src.AbstractFileSource.createEnumerator(AbstractFileSource.java:143) ~[flink-connector-files-1.17.0.jar:1.17.0]
	at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:213) ~[flink-dist-1.17.0.jar:1.17.0]
	at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:315) ~[flink-dist-1.17.0.jar:1.17.0]
	at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.start(RecreateOnResetOperatorCoordinator.java:70) ~[flink-dist-1.17.0.jar:1.17.0]
	at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:181) ~[flink-dist-1.17.0.jar:1.17.0]
	at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:165) ~[flink-dist-1.17.0.jar:1.17.0]
	at org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.startAllOperatorCoordinators(DefaultOperatorCoordinatorHandler.java:82) ~[flink-dist-1.17.0.jar:1.17.0]
	at org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:615) ~[flink-dist-1.17.0.jar:1.17.0]
	at org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:1044) ~[flink-dist-1.17.0.jar:1.17.0]
	at org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:961) ~[flink-dist-1.17.0.jar:1.17.0]
	at org.apache.flink.runtime.jobmaster.JobMaster.onStart(JobMaster.java:424) ~[flink-dist-1.17.0.jar:1.17.0]
	at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:198) ~[flink-dist-1.17.0.jar:1.17.0]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.lambda$start$0(AkkaRpcActor.java:622) ~[flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:621) ~[flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:190) ~[flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
	at akka.actor.Actor.aroundReceive(Actor.scala:537) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
	at akka.actor.Actor.aroundReceive$(Actor.scala:535) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
	at akka.actor.ActorCell.invoke(ActorCell.scala:547) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
	at akka.dispatch.Mailbox.run(Mailbox.scala:231) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
	at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [flink-rpc-akka_3e956fd0-21e9-41aa-b26a-10f8e5bd0ae3.jar:1.17.0]
	at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?]
	at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) [?:?]
	at java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?]
	at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) [?:?]
	at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) [?:?]
Caused by: java.nio.file.AccessDeniedException: <s3://test-dev/test.csv>: getFileStatus on <s3://test-dev/test.csv>: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: WZ0MBF6DFKQ2GT21; S3 Extended Request ID: WbDaX6osQkhvdkcwPQe0FuyXj2f3ESqwerQupUrypbgc/eDpX5fVonNTarRfUm/RPZ/lxnHRpts=; Proxy: null), S3 Extended Request ID: WbDaX6osQkhvdkcwPQe0FuyXj2f3ESqwerQupUrypbgc/eDpX5fVonNTarRfUm/RPZ/lxnHRpts=:403 Forbidden
	at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:255) ~[?:?]
	at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:175) ~[?:?]
Copy code
60
2023-05-31 09:58:55,953 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3.access-key-id, xxxxxxxxx
...
2023-05-31 09:58:55,954 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: s3.secret-access-key, ******
2023-05-31 09:58:55,954 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: execution.attached, true
2023-05-31
g
try setting
s3a.access-key-id
and
s3.secret-access-key
from logs, i think it using S3A
m
Out of curiosity, why do you use the DataStream API over SQL/Table API?
h
I have set s3a.access-key-id and s3a.secret-access-key in the flink-conf.yaml, but still not working @Gaurav Miglani
Is there any better way if I want to load csv data from s3 to iceberg exactly once? What I am thinking is DataStream API could achieve the exactly-once purpose. @Martijn Visser
m
Table API/SQL also supports exactly once, so that doesn't make a difference
h
Then what about the checkpoint? I want to load the each of the data only once and avoid duplicate data.
m
Checkpoints are needed for exactly once. So it uses that too
h
Can we use Table&SQL api to read csv file on s3?
m
Yep
h
I didn't find any related examples on the official website. Could you show me some demo?
m
It’s not possible to give examples for every possible combination, so it’s generalized on for example https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/filesystem/