Suparn Lele
04/05/2023, 8:12 AMkafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
So this committedOffsets are taken from KafkaBroker or ZK directly?Martijn Visser
04/05/2023, 8:13 AMSuparn Lele
04/05/2023, 8:15 AMSuparn Lele
04/05/2023, 8:15 AMMartijn Visser
04/05/2023, 8:16 AMOffsetsInitializer.committedOffsets(org.apache.kafka.clients.consumer.OffsetResetStrategy) - starting from the committed offsets of the consumer group. If there is no committed offsets, starting from the offsets specified by the OffsetResetStrategy.
https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.html#setStarti[…]itializer-Suparn Lele
04/05/2023, 8:17 AMMartijn Visser
04/05/2023, 8:17 AMSuparn Lele
04/05/2023, 8:17 AMMartijn Visser
04/05/2023, 8:18 AMSuparn Lele
04/05/2023, 8:20 AMSuparn Lele
04/05/2023, 8:21 AMMartijn Visser
04/05/2023, 8:23 AMenable.auto.commit
and <http://auto.commit.interval.ms|auto.commit.interval.ms>
- See https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kafka/#consumer-offset-committingSuparn Lele
04/05/2023, 8:27 AMSuparn Lele
04/05/2023, 8:29 AMkafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
Please correct me if I am wrongMartijn Visser
04/05/2023, 8:50 AMSuparn Lele
04/05/2023, 8:59 AMSuparn Lele
04/05/2023, 10:00 AMkafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
I have kept one job running for some time. I killed the job. Then after 20 minutes I restarted the job with same group id but I dont want to resume from the last checkpoint. I am expecting that since the group id is same I would start exactly where I left before
Now 10 out of 25 partitions finds the offset and I get this log
Setting offset for partition ${topicName}-${topicNumber} to the committed offset FetchPosition
And of remaining 15 partitions I am getting
No offset found for partition ${topicName}-${topicNumber}
Resetting offset for partition ${topicName}-${topicNumber} to XYZ which I suppose would be the latest offset.
Now when this happen, Actually I am not getting data for the 20 minutes for which I stopped the job. I am getting only latest data. Do you have any idea how could this happen?Martijn Visser
04/05/2023, 10:15 AMSuparn Lele
04/05/2023, 11:10 AMkafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
Then implicitely it means that it wont use flink checkpoints in kafka source?Martijn Visser
04/05/2023, 11:12 AMSuparn Lele
04/05/2023, 11:27 AMkafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
Now say at 10:00 I started a job with fresh setup. I killed that job at 10:20. Now I am again restarting that job & I am choosing to start with a fresh new checkpoint So I am not resuming from the old checkpoint. Although I am doing this the group id of my kafkaSource remain same. So when I start the job at 10:20 I should be getting the data between 10:00 to 10:20 right? Because even if the checkpoints are not there, the committed offsets must exist in broker/zk.
What I am observing is that out of 25 partitions that we have in the kafka 10/25 partitions saying that they have found the previously committed offset and they are setting that offset, while the remaining 15 says that they were not able to see the offset so they are resetting to latest because I have mentioned that. Since 10/25 partitions are finding the offsets properly I should get the data from 10:00 to 10:20 but I am not getting that data. So I was asking whether you know of any possible reason?Martijn Visser
04/05/2023, 11:35 AMSuparn Lele
04/05/2023, 11:36 AMMartijn Visser
04/05/2023, 11:36 AMMartijn Visser
04/05/2023, 11:36 AMSuparn Lele
04/05/2023, 11:37 AM