Hello, I'm setting up a flink application to incre...
# troubleshooting
l
Hello, I'm setting up a flink application to incrementally process files saved in s3 using FileSource API;
Copy code
FileSource.forRecordStreamFormat(new Format(), new Path(path))
                .monitorContinuously(Duration.ofMinutes(15))
                .build()
In production, my job fails to start, despite increasing timeouts to 10 minutes, with one of the last lines in the logs being
Copy code
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Starting split enumerator for source Source: S3.
Having debugged a bit locally, with a much smaller dataset, I believe the problem to be that for SplitEnumerator to start, it wants to first list all existing paths in S3 subdirectory. (of which there are many, 100k+ subdirectories) Is it possible to configure the enumerator to incrementally discover new s3 files, aka, allowing it to start without fully discovering entire s3 structure? Alternatively, can anyone share their experience with going around this problem?
Bumping this up again, I'm surprised this is not mentioned more frequently. Are folks not processing large S3 directories?
d
I don’t know of a out of box config option to make the FileSource enumerate splits incrementally, but there are some ways to address this. You could implement a custom SplitEnumerator that is tailored to your use case. Extend SplitEnumerator class and implement logic that fetches and processes file paths incrementally instead of trying to fetch all at once. This way, you can control how the splits are discovered and loaded into Flink. This could be done by fetching a batch of paths, processing them, then fetching the next batch.
You might also preprocess your S3 directory structure to have a more manageable number of top-level directories or use some form of partitioning logic that allows you to only point Flink at the relevant subset of directories at any given time.
Optimizing your S3 bucket can help. Use S3 prefixes effectively and consider enabling S3 Requester Pays if the bucket is owned by a different AWS account to avoid potential request throttling.
You might consider pre-processing outside of Flink. Generate a manifest or index file listing all the files you want to process, and then configure your Flink job to read from this manifest. This way, the heavy lifting of listing files is done once and upfront, and Flink simply reads from the manifest.
It can also help to implement a compaction strategy that merges smaller files into larger ones.
In summary I don’t think what your looking for is available out-of-the-box with FileSource (maybe it should be …) but you can try these methods to improve performance and affect incremental files discovery.