Having some troubles with sinking to Azure Onelake...
# troubleshooting
j
Having some troubles with sinking to Azure Onelake... I can submit my job but it fails right away with java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSys tem not found. I copied the flink-azure-fs-hadoop-1.20.0.jar to plugins/azure-fs-hadoop. Isn't that supposed to provide this class? Flink 1.20.0 running in a VM on Java 17...
Same result with Java 11
d
The error you’re encountering, java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem not found, indicates Flink can’t locate the necessary classes for interacting with Azure Blob Storage via the ABFS (Azure Blob File System) driver. Placing the flink-azure-fs-hadoop-1.20.0.jar into the plugins/azure-fs-hadoop directory is correct. However, there are a few more steps and checks to ensure that Flink recognizes and loads this plugin correctly: Ensure that the JAR file you’ve copied indeed contains the class org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem. You can do this by running
Copy code
jar tf flink-azure-fs-hadoop-1.20.0.jar
and checking the class is there. Even though you’ve placed the JAR in the plugins folder, Flink might not automatically include it in its classpath, especially if you’re running Flink in a custom setup like a VM. Check that Flink is configured to include this plugin directory in its classpath. Make sure your Flink job’s configuration correctly points to use the ABFS filesystem. In your Flink job’s configuration (flink-conf.yaml or passed as command-line arguments), you should have something like:
Copy code
fs.default-scheme: abfs
You might also need to configure Hadoop dependencies if your job indirectly uses Hadoop APIs to interact with storage.
The Azure Blob File System (ABFS) client is part of Hadoop libraries. Make sure that the version of the hadoop-azure or hadoop-azure-datalake library if used is compatible with both your Flink version and the flink-azure-fs-hadoop plugin. Mismatched versions can lead to ClassNotFoundException and not just missing jars.
j
Thanks Draco! I'll start checking!
The class is provided in the plugin, Flink is reading the plugin according to the logs. The job uses a abfss:/ url which seems to working since it tries to find the SecureAzureBlobFileSystem. Changing the url to abfs: gives a AzureBlobFileSystem class not found error, which seems reasonable. I'll check versions next
d
Alright might also need to check Azure Storage SDK is current as well.
Copy code
mvn dependency:tree
can show you list of dependencies and their versions for flink-azure-fs-hadoop-1.20.0.jar
j
Seems like something is going wrong with loading the plugin, moving the plugin to the lib/ folder seems to fix this problem. (But reveals new problems with the configuration, yay!)
d
ok might want to bump logging up to debug
Copy code
log4j.logger.org.apache.flink=DEBUG
log4j.logger.org.apache.hadoop=DEBUG
This might reveal why moving to lib/ folder caused the error message to go away and determine whats wrong with plugin loading process
Since you’re dealing with Azure storage, it’s crucial that the Hadoop configuration is correctly set up within Flink. Settings like fs.abfs.impl and fs.AbstractFileSystem.abfs.impl may need to point to the SecureAzureBlobFileSystem class - newer versions of the Hadoop libraries might auto-configure but not sure.
Flink has a specific way of loading plugins from the plugins/ directory, which might have been misconfigured or not fully compatible with your custom VM setup. There could be subtle differences in how classpaths are handled between different Flink deployment modes (e.g., standalone, YARN, Kubernetes)
While putting in /lib directory is a workaround its probably not ideal in terms of modular setup and should not be necessary. You should try to uncover the root cause of the error loading the plugin.
Might need to set
Copy code
log4j.logger.org.apache.flink.util.ChildFirstClassLoader=DEBUG
to check what the loading issue was.
You can try loading both in when library is in /lib and without when its part of plugin directory, and compare difference through explicit class loading configuration. Add the following code snippet early in your job’s main method or initialization logic:
Copy code
import org.apache.flink.util.ChildFirstClassLoader;
import java.net.URLClassLoader;

// assuming your main class or somewhere in the initialization
public class YourFlinkJob {
    public static void main(String[] args) throws Exception {
        // get the current classloader, assuming it's ChildFirstClassLoader in Flink's context
        ClassLoader cl = Thread.currentThread().getContextClassLoader();
        
        if (cl instanceof ChildFirstClassLoader) {
            System.out.println("Flink's ClassLoader Debug Information:");
            URLClassLoader ucl = (URLClassLoader) cl;
            for (URL url : ucl.getURLs()) {
                System.out.println(url);
            }
            
            // enable debugging for classloading if supported
            if (cl instanceof ChildFirstClassLoader) {
                ((ChildFirstClassLoader) cl).setDebug(true);
            }
        }
        
        // Rest of your job initialization and execution code...
    }
}
This code snippet first retrieves the current thread’s context classloader, which in a Flink environment should be an instance of ChildFirstClassLoader. It then prints out the URLs from which this classloader can load classes, effectively showing the classpath. Additionally, it sets the debug flag to true if the classloader supports it, which can provide additional debug output related to classloading. Remember to revert these debugging configurations once you’ve diagnosed the issue.
Classloader issues can be nasty and they are the most common source of errors in Flink. I would re-read the plugin documentation for configuration instructions once before taking additional steps