We are trying to enable <HDFS> based file system v...
# troubleshooting
s
We are trying to enable HDFS based file system via FileSink on flink. We are using a 3P cloud provider(like AWS, GCP, Azure) which takes away the overhead of setting up vanilla flink cluster running on kubernetes. We just submit the application jar with pipeline definition using vended API and cloud provider executes it on our behalf. However, FileSystem initialisation works differently than other connectors on flink. It gets initialised during TaskManager startup and filesystem object can be fetched at runtime in sink connector using the scheme defined in the file URI. If all hadoop dependencies and configs files are not present at the time of task manager startup, then there will be failures while fetching filesystem object in the sink initialisation after App jar submission. Since we have no control over task manager lifecycle as it is controlled by 3P cloud provider, we are not able to initialise FileSystem correctly for our use case. I would like to understand that why file system can't be initialised as part of operator or connector class. It seems like a conscious design choice that FileSystem initialisation call was intentionally made only once during task manager startup. What is the downside of initialising it again and again in connector/operator class.
m
It's because Flink's filesystems are loaded through its plugin systems, to avoid conflicting versions of the same library without the need to relocate classes or to converge to common versions
Flink has had a lot of trouble in the past with the Hadoop dependency footprint and ultimately the project decided to go for this route
s
Thanks for the quick response. AFAIK, Not all Flink's filesystem are loaded through plugin systems. There is a fallback factory provided to load HDFS compliant file-system using
org.apache.flink.runtime.fs.hdfs.HadoopFsFactory
. IMO, Shaded jars was the answer to handle conflicting version issues. I might be over-simplifying it but I can't wrap my head around why file systems are different than any other application dependencies. I understand that hadoop dependency tree can very complex due to which flink went away from copying jar in older versions to specifying a separate hadoop classpath altogether.
m
All file systems are pluggable, see https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/plugins/#file-systems - You should use plugins. The S3 file systems can only be used a plugins. The fact that there might be a fallback method, doesn't indicate that this isn't the way to go.
Shaded jars was the answer to handle conflicting version issues.
Given that Flink internally, connectors and user applications all can introduce conflicting versions, I don't think that's the answer. The answer is to isolate things. We're already seeing how insanely hard it is to update to a newer version of Guava internally, because connectors that have been externalized relied on that shaded version. It takes multiple releases before these things can happen.
why file systems are different than any other application dependencies.
It's because the file systems implementations are ahead of the curve and were done first, to actually break the deadlock we were running into with shading.
s
Understood. Isolation seems like correct answer here. Plugin structure was one way of isolating different file systems. Specifying different hadoop classpath is another way of isolating fallback/default hadoop based file system. I can totally relate to such guava issues. On my local flink cluster, We are using the older way of copying
flink-shaded-hadoop-2-uber
jar in lib directory and running into NoClassDefFoundError for google guava classes even if classes are present in application jar. Are aware of any way to troubleshoot such issues ??
m
flink-shaded-hadoop-2-uber
is not maintained and shouldn't be used πŸ˜