I have defined the following sink: ```object Parqu...
# troubleshooting
e
I have defined the following sink:
Copy code
object ParquetSink {

 def parquetFileSink[A <: Message: ClassTag](
   assigner: A => String,
   config: Config
 )(implicit lc: LoggingConfigs): FileSink[A] = {
  val bucketAssigner = new BucketAssigner[A, String] {
   override def getBucketId(element: A, context: BucketAssigner.Context): String = {
    val path = assigner(element)
    <http://logger.info|logger.info>(LogMessage(-1, s"Writing file to ${config.getString(baseDirKey)}/$path", "NA"))
    path
   }

   override def getSerializer: SimpleVersionedSerializer[String] = SimpleVersionedStringSerializer.INSTANCE
  }

  def builder(outFile: OutputFile): ParquetWriter[A] =
   new ParquetProtoWriters.ParquetProtoWriterBuilder(
    outFile,
    implicitly[ClassTag[A]].runtimeClass.asInstanceOf[Class[A]]
   ).withCompressionCodec(config.getCompression(compressionKey)).build()

  val parquetBuilder: ParquetBuilder[A] = path => builder(path)
  FileSink
   .forBulkFormat(
    new Path(s"wasbs://${config.getString(baseDirKey)}@${config.getString(accountNameKey)}.<http://blob.core.windows.net|blob.core.windows.net>"),
    new ParquetWriterFactory[A](parquetBuilder)
   )
   .withBucketAssigner(bucketAssigner)
   .withOutputFileConfig(
    OutputFileConfig
     .builder()
     .withPartSuffix(".parquet")
     .build()
   )
   .build()
 }
}
After deploying the job I get the following exception: Caused by: java.lang.UnsupportedOperationException: Recoverable writers on AzureBlob are only supported for ABFS at org.apache.flink.fs.azurefs.AzureBlobRecoverableWriter.checkSupportedFSSchemes(AzureBlobRecoverableWriter.java:44) ~[?:?] at org.apache.flink.fs.azure.common.hadoop.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:57) ~[?:?] at org.apache.flink.fs.azurefs.AzureBlobRecoverableWriter.<init>(AzureBlobRecoverableWriter.java:37) ~[?:?] at org.apache.flink.fs.azurefs.AzureBlobFileSystem.createRecoverableWriter(AzureBlobFileSystem.java:44) ~[?:?] at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.createRecoverableWriter(PluginFileSystemFactory.java:134) ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT] The question is whether I can make any changes to this code/configs to make it work with wasbs protocol and not abfs ? Tnx everyone.
m
No, that's because recoverable writers are not implemented for wasbs, only for abfs.
e
Ok.. that part I have figured out, but do you mean to say that there's no way using FileSink with wasbs protocol? I mean can I change anything in order not to use a RecoverableWriter ?
m
RecoverableWriter is needed in order to support resuming consistently after a failure and recovery without data loss/duplication.
e
Let's assume I can do without that feature (I can live with data loss or duplications). No way using some kind of "NonRecovarableWriter" or anything else ?
m
The only thing you could try is disabling checkpointing and see if the message then still appears
e
ok.. I will try. Tnx
No, unfortunately it has nothing to do with snapshoting. It actually makes sense since the exception is thrown from
AzureBlobRecoverableWriter
inside the sink itself and has nothing to do with a concurrent snapshoting process.
m
Disabling snapshots != concurrent snapshots. I was hoping that disabling snapshots altogether would eliminate the need for recoverable writers (since that's related to creation/restoring of snapshots only).
e
Actually the checkpointing configuration we are using in yml is configured using wasbs, and works ok. It is just the sink that causes issues.