Hi, I am using Flink 1.15 with the old Kafka sourc...
# troubleshooting
b
Hi, I am using Flink 1.15 with the old Kafka source function. I have a Kafka source chained with two transformations, displayed as one chained operator in web ui. Here’s my code:
Copy code
val myConsumer = new FlinkKafkaConsumer<>("myTopic", new MySchema(kafkaConf.schemaRegistry), properties)
myConsumer.setStartFromTimestamp(someTimestamp)

val myStream = env
  .addSource(myConsumer)
  .name(s"${myConsumer.topic}_v1").uid(s"${myConsumer.topic}_v1")
  .assignTimestampsAndWatermarks(
    WatermarkStrategy.forMonotonousTimestamps[MyData]().withIdleness(Duration.ofSeconds(20))
  )
  .name("Data with watermarks").uid("Data with watermarks")
  .map(Entry.fromInput(_))
  .name("Data to entry").uid("Data to entry")
  .unNone // custom syntax
  .name("Entry not empty").uid("Entry not empty")
  .keyBy(_.id)
What I'm doing: I am changing the UID at the head of the chain to
.name(s"${myConsumer.topic}_v2").uid(s"${myConsumer.topic}_v2")
and then restoring from a savepoint with the
--allowNonRestoredState
flag. What I expect to happen: The
myConsumer
should read from the offsets starting from the provided timestamp. What actually happens: The
myConsumer
tries to restore from the state and falls back to the earliest offsets. Why? Will switching to the new DataSource API help? I would appreciate any help with this issue.
a
Are you changing the UID specifically to reprocess the data? (e.g. do you want to explicitly disregard the stored offsets)
b
Yes, I added this to disregard stored offsets because I was upgrading the Flink version and didn't want to read from the stored offsets of the old connector, and my processing is duplicate-proof anyway.
a
It's a good opportunity to upgrade to the new KafkaSource then. It should disregard the stored offsets and use the timestamp.
If not that's a bug to report.
b
Yes, I am considering it. I also discovered that if I change the UIDs on all operators in the chain, then starting from the offsets with the provided timestamp works correctly.