Magdalena Kobusch
09/19/2024, 2:55 PMFileSink<KafkaMessage> sink = FileSink
.forBulkFormat(buildPath(context), new ZstdWriterFactory<KafkaMessage>())
.withOutputFileConfig(outputFileConfig())
.withRollingPolicy(new CustomCheckpointRollingPolicy<>(
mbToBytes(context.getTargetFileSizeMB()),
minToMillis(context.getMaxInactivityMinutes())))
.withBucketAssigner(new CustomBucketAssigner())
.enableCompact(
FileCompactStrategy.Builder.newBuilder()
.setSizeThreshold(100000)
.enableCompactionOnCheckpoint(100)
.build(),
new ConcatFileCompactor())
.build();