Indrek Tammaru
08/01/2024, 2:27 PMval kafkaSource: DataStream[ObjectNode] = getKafkaSource(kafkaInputTopics, sourceProperties, env, getClass)
val topics = kafkaInputTopics.split(",")
val outputTags: Map[String, OutputTag[ObjectNode]] = topics.map { topic =>
topic -> new OutputTag[ObjectNode](topic) {}
}.toMap
val mainStream: SingleOutputStreamOperator[ObjectNode] = kafkaSource
.process(new ProcessFunction[ObjectNode, ObjectNode] {
override def processElement(value: ObjectNode, ctx: ProcessFunction[ObjectNode, ObjectNode]#Context, out: Collector[ObjectNode]): Unit = {
val topicName = value.at("/metadata/topic").asText()
outputTags.get(topicName) match {
case Some(tag) =>
ctx.output(tag, value)
case None =>
out.collect(value)
}
}
})
val sinkProperties: Properties = getKafkaProperties(kafkaOutputPropertiesPath)
outputTags.foreach { case (topicName, tag) =>
val sideOutput: DataStream[ObjectNode] = mainStream.uid("SourceUid").name("SourceName").getSideOutput(tag)
val kafkaSink = getKafkaSinkV2(s"$topicName-indrek", sinkProperties)
sideOutput
.addSink(kafkaSink)
.uid(s"Sink.$topicName")
.name(s"Sink.$topicName")
}
The issue I’m facing is that filtering Kafka messages doesn’t seem to keep data in Kafka sink permanently. Filtering by some primary key field I identified that the message was delivered to Kafka sink, but it disappeared after some time. With the following filter applied the message doesn’t persist to Kafka topic
val keep = value.at("/key").asText() == awsRegion
if (keep) {
outputTags.get(topicName) match {
case Some(tag) =>
ctx.output(tag, value)
case None =>
out.collect(value)
}
}