Hi team, I need some help in flink. I am implement...
# troubleshooting
l
Hi team, I need some help in flink. I am implementing a flink job, reading parquet files from s3 path, and output to a kafka topic. Currently, there is always this issue:
Copy code
14:57:29.913 ERROR org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager: Received uncaught exception. java.lang.NoClassDefFoundError: org/apache/hadoop/mapreduce/lib/input/FileInputFormat
        at org.apache.parquet.HadoopReadOptions$Builder.<init>(HadoopReadOptions.java:112)
        at org.apache.parquet.HadoopReadOptions$Builder.<init>(HadoopReadOptions.java:97)
        at org.apache.parquet.HadoopReadOptions.builder(HadoopReadOptions.java:85)
        at org.apache.parquet.hadoop.ParquetReader$Builder.<init>(ParquetReader.java:221)
        at org.apache.parquet.avro.AvroParquetReader$Builder.<init>(AvroParquetReader.java:143)
        at org.apache.parquet.avro.AvroParquetReader$Builder.<init>(AvroParquetReader.java:131)
        at org.apache.parquet.avro.AvroParquetReader.builder(AvroParquetReader.java:53)
        at org.apache.flink.formats.parquet.avro.AvroParquetRecordFormat.createReader(AvroParquetRecordFormat.java:83)
        at org.apache.flink.connector.file.src.impl.StreamFormatAdapter.lambda$createReader$0(StreamFormatAdapter.java:77)
        at org.apache.flink.connector.file.src.util.Utils.doWithCleanupOnException(Utils.java:45)
        at org.apache.flink.connector.file.src.impl.StreamFormatAdapter.createReader(StreamFormatAdapter.java:73)
        at org.apache.flink.connector.file.src.impl.FileSourceSplitReader.checkSplitOrStartNext(FileSourceSplitReader.java:112)
        at org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:65)
        at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
        at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162)
        at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
I have added
parquet-avro
dependency as suggested by flink doc:
Copy code
implementation ("org.apache.parquet:parquet-avro:1.12.2"){
        exclude (group = "org.apache.hadoop", module = "hadoop-client")
        exclude (group = "it.unimi.dsi", module = "fastutil")
    }
I thought the issue was related to the missing of "org.apache.hadoophadoop client3.2.1", but when I added it, there was credentials issues like this:
Copy code
Caused by: org.apache.hadoop.fs.s3a.auth.NoAwsCredentialsException: Dynamic session credentials for Flink: No AWS Credentials
I have already added these dependencies:
Copy code
implementation ("org.apache.parquet:parquet-hadoop:1.12.2")
    implementation ("org.apache.hadoop:hadoop-client:3.2.1")
    implementation (enforcedPlatform("com.amazonaws:aws-java-sdk-bom:1.12.515"))
    implementation ("com.amazonaws:aws-java-sdk-s3:1.12.515")
    implementation ("com.amazonaws:aws-java-sdk-core:1.12.515")
    implementation ("com.amazonaws:aws-java-sdk-sts:1.12.515")
    implementation ("com.amazonaws:aws-java-sdk-sqs:1.12.515")
I have tried a lot of versions for
aws-java-sdk
, like
1.12.433
,
1.12.438
,
1.12.99
, but none of them work. And I am not sure if this error is related to aws sdk. Did I miss some dependency here? Really really need some help, thank you!!!
a
It seems you need to add auth method, something like this in flink config
fs.s3a.aws.credentials.provider: com.amazonaws.auth.WebIdentityTokenCredentialsProvider
https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html#Storing_secrets_with_Hadoop_Credential_Providers
m
If you want to use Parquet, you need to make sure that you have the necessary Hadoop dependencies added as well
When I wrote a recipe for writing Parquet, I needed to get the following dependencies added https://github.com/immerok/recipes/blob/main/write-parquet-files/pom.xml#L62-L94
So I think there's two separate things: 1. You want to read Parquet files, which requires Hadoop 2. You want to read the Parquet files from S3, which require the file system plugins as highlighted on https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins
l
Hi @Martijn Visser, thanks for the help! I tried removing
hadoop-client
and adding
hadoop-mapreduce-client-core
, the error of
FileInputFormat
disappeared! So I guess it's because of the missing
hadoop-mapreduce-client-core
. I should have noticed this from the error message `org/apache/hadoop/mapreduce/lib/input/FileInputFormat`πŸ˜‚. Thanks!!
Hi @Alexey Seleznev, for frequent aws credentials issue when running locally, I am not sure the exact reason lol, now I just using
aws sts assume-role
cli to get the temporary
AccessKeyId
,
SecretAccessKey
and
SessionToken
, which is easy and would fix it for some time...