Hi, I’m trying out different ways to use Flink as ...
# troubleshooting
i
Hi, I’m trying out different ways to use Flink as Kafka topic replicator instead of Kafka Mirror Maker. Flink version is 1.13.2. The following code snippet successfully replicates data from every source topic on Kafka cluster A to Kafka cluster B.
Copy code
val kafkaSource: DataStream[ObjectNode] = getKafkaSource(kafkaInputTopics, sourceProperties, env, getClass)
    val topics = kafkaInputTopics.split(",")
    val outputTags: Map[String, OutputTag[ObjectNode]] = topics.map { topic =>
      topic -> new OutputTag[ObjectNode](topic) {}
    }.toMap

    val mainStream: SingleOutputStreamOperator[ObjectNode] = kafkaSource
      .process(new ProcessFunction[ObjectNode, ObjectNode] {
        override def processElement(value: ObjectNode, ctx: ProcessFunction[ObjectNode, ObjectNode]#Context, out: Collector[ObjectNode]): Unit = {
          val topicName = value.at("/metadata/topic").asText()
          outputTags.get(topicName) match {
            case Some(tag) =>
              ctx.output(tag, value)
            case None =>
              out.collect(value)
          }
        }
      })
    val sinkProperties: Properties = getKafkaProperties(kafkaOutputPropertiesPath)
    outputTags.foreach { case (topicName, tag) =>
      val sideOutput: DataStream[ObjectNode] = mainStream.uid("SourceUid").name("SourceName").getSideOutput(tag)
      val kafkaSink = getKafkaSinkV2(s"$topicName-indrek", sinkProperties)
      sideOutput
        .addSink(kafkaSink)
        .uid(s"Sink.$topicName")
        .name(s"Sink.$topicName")
    }
The issue I’m facing is that filtering Kafka messages doesn’t seem to keep data in Kafka sink permanently. Filtering by some primary key field I identified that the message was delivered to Kafka sink, but it disappeared after some time. With the following filter applied the message doesn’t persist to Kafka topic
Copy code
val keep = value.at("/key").asText() == awsRegion
          if (keep) {
            outputTags.get(topicName) match {
              case Some(tag) =>
                ctx.output(tag, value)
              case None =>
                out.collect(value)
            }
          }