Boris Zubov
08/29/2024, 7:36 AMval myConsumer = new FlinkKafkaConsumer<>("myTopic", new MySchema(kafkaConf.schemaRegistry), properties)
myConsumer.setStartFromTimestamp(someTimestamp)
val myStream = env
.addSource(myConsumer)
.name(s"${myConsumer.topic}_v1").uid(s"${myConsumer.topic}_v1")
.assignTimestampsAndWatermarks(
WatermarkStrategy.forMonotonousTimestamps[MyData]().withIdleness(Duration.ofSeconds(20))
)
.name("Data with watermarks").uid("Data with watermarks")
.map(Entry.fromInput(_))
.name("Data to entry").uid("Data to entry")
.unNone // custom syntax
.name("Entry not empty").uid("Entry not empty")
.keyBy(_.id)
What I'm doing: I am changing the UID at the head of the chain to .name(s"${myConsumer.topic}_v2").uid(s"${myConsumer.topic}_v2")
and then restoring from a savepoint with the --allowNonRestoredState
flag.
What I expect to happen: The myConsumer
should read from the offsets starting from the provided timestamp.
What actually happens: The myConsumer
tries to restore from the state and falls back to the earliest offsets. Why?
Will switching to the new DataSource API help?
I would appreciate any help with this issue.Arvid Heise
08/29/2024, 9:03 AMBoris Zubov
08/29/2024, 9:19 AMArvid Heise
08/29/2024, 9:40 AMArvid Heise
08/29/2024, 9:40 AMBoris Zubov
08/29/2024, 9:42 AM