Hello, I’m planning on using S3 as a data source ...
# troubleshooting
a
Hello, I’m planning on using S3 as a data source where the objects are partitioned by hour in the following format: s3://test-data/<yyyy-MM-dd>/topic/<HH>. I discovered I can use FileSource with the monitorContinuously function, but I’m trying to find a way to ensure that at any given time, it’s only checking for new files in the current hour directory. If I use s3://test-data/<yyyy-MM-dd>/topic as the base path to monitor for new files, it will require terminating the job every day to prevent it from monitoring old directories. Is there a more efficient way to achieve this so that I can limit the FileSource to only monitor and read from the current hour’s directory? Any advice or insights would be greatly appreciated! Thanks!
s
I don't think so, AFAIK FileSource creates a collection of "leaf" paths in your directory tree and checks all of them every time. You can't really modify this behaviour unless you implement a custom connector.
👍 1
a
Thanks for looking into this, seems like best i can do is to terminate and create new job every Day
s
So do you intentionally want to avoid checking older paths? Is it very slow in your case?
a
Yes, in my use case, it’s unnecessary and inefficient to continually check older paths. This is because once we’ve processed the data from a specific hourly directory, we know that no new data will be written to that directory. Our data pipeline is set up in such a way that new data is always written to a new, current timestamp directory. while I haven’t done testing. Thought i could avoid cost involved in scanning the older directories. Moreover, since our data is partitioned by day as well as hour, it means we need to construct the list of Paths to scan each day dynamically.
s
It’s unnecessary and inefficient, but in my experience Flink handles it well, we don’t have any issues because of that.
Moreover, since our data is partitioned by day as well as hour, it means we need to construct the list of Paths to scan each day dynamically.
Not sure I fully understand that… Flink is able to recursively find all “sub folders” as well
a
Our S3 bucket is structured in the following manner:
<s3://test-data/><yyyy-MM-dd>/<event>/<HH>
. We have data stored in this format dating back to 2019, and for each day, we have approximately 50 different event types stored in their respective directories. However, out of these, my use case involves processing only around 4-5 specific event types. I need to process current data, my strategy is to initially provide the path
<s3://test-data/><current date>/<topic>
to Flink’s
FileSource
. This allows the
FileSource
to continuously monitor this directory for new files corresponding to the current date. Once a day’s data has been processed and the day has ended, I would need to update the
FileSource
to process data for the new date. Is there any way to configure FileSource efficiently ? is creating a custom FileEnumerator makes sense ?