Eli Golin
05/30/2023, 8:23 AMobject ParquetSink {
def parquetFileSink[A <: Message: ClassTag](
assigner: A => String,
config: Config
)(implicit lc: LoggingConfigs): FileSink[A] = {
val bucketAssigner = new BucketAssigner[A, String] {
override def getBucketId(element: A, context: BucketAssigner.Context): String = {
val path = assigner(element)
<http://logger.info|logger.info>(LogMessage(-1, s"Writing file to ${config.getString(baseDirKey)}/$path", "NA"))
path
}
override def getSerializer: SimpleVersionedSerializer[String] = SimpleVersionedStringSerializer.INSTANCE
}
def builder(outFile: OutputFile): ParquetWriter[A] =
new ParquetProtoWriters.ParquetProtoWriterBuilder(
outFile,
implicitly[ClassTag[A]].runtimeClass.asInstanceOf[Class[A]]
).withCompressionCodec(config.getCompression(compressionKey)).build()
val parquetBuilder: ParquetBuilder[A] = path => builder(path)
FileSink
.forBulkFormat(
new Path(s"wasbs://${config.getString(baseDirKey)}@${config.getString(accountNameKey)}.<http://blob.core.windows.net|blob.core.windows.net>"),
new ParquetWriterFactory[A](parquetBuilder)
)
.withBucketAssigner(bucketAssigner)
.withOutputFileConfig(
OutputFileConfig
.builder()
.withPartSuffix(".parquet")
.build()
)
.build()
}
}
After deploying the job I get the following exception:
Caused by: java.lang.UnsupportedOperationException: Recoverable writers on AzureBlob are only supported for ABFS
at org.apache.flink.fs.azurefs.AzureBlobRecoverableWriter.checkSupportedFSSchemes(AzureBlobRecoverableWriter.java:44) ~[?:?]
at org.apache.flink.fs.azure.common.hadoop.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:57) ~[?:?]
at org.apache.flink.fs.azurefs.AzureBlobRecoverableWriter.<init>(AzureBlobRecoverableWriter.java:37) ~[?:?]
at org.apache.flink.fs.azurefs.AzureBlobFileSystem.createRecoverableWriter(AzureBlobFileSystem.java:44) ~[?:?]
at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.createRecoverableWriter(PluginFileSystemFactory.java:134) ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT]
The question is whether I can make any changes to this code/configs to make it work with wasbs protocol and not abfs ?
Tnx everyone.Martijn Visser
05/30/2023, 8:26 AMEli Golin
05/30/2023, 8:34 AMMartijn Visser
05/30/2023, 8:36 AMEli Golin
05/30/2023, 8:37 AMMartijn Visser
05/30/2023, 8:41 AMEli Golin
05/30/2023, 8:41 AMEli Golin
05/30/2023, 10:26 AMAzureBlobRecoverableWriter
inside the sink itself and has nothing to do with a concurrent snapshoting process.Martijn Visser
05/30/2023, 11:17 AMEli Golin
05/30/2023, 11:45 AM