Hi I am using Flink 1.14. When setting properties ...
# troubleshooting
s
Hi I am using Flink 1.14. When setting properties for KafkaSourceBuilder, I came across this.
Copy code
kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
So this committedOffsets are taken from KafkaBroker or ZK directly?
m
If you have ran your job before and are restarting from a checkpoint/savepoint, the offsets from the snapshot will be used. If it's a new job, then it depends on how you've configured it
s
Ok alright. Please consider that this is the new job. Then what would happen?
Will it take offsets from kafkabroker/zk?
m
Copy code
OffsetsInitializer.committedOffsets(org.apache.kafka.clients.consumer.OffsetResetStrategy) - starting from the committed offsets of the consumer group. If there is no committed offsets, starting from the offsets specified by the OffsetResetStrategy.
https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.html#setStarti[…]itializer-
s
Ok thanks
m
So it would start reading from latest
s
Wait wait I am now confused
m
You mentioned it's a new job, so nothing has been read from it. That also mean that there are no committed offsets yet. So since there are no committed offsets, it uses the specified OffsetResetStrategy which is set to LATEST
s
Okk its a new job with Apache flink's perspective. Like their are no checkpoints. Or lets say There was job A where I was using group id XYZ. Now that job failed. But while resuming I chose not to resume from checkpoints but the group id remains same XYZ. What would happen in this case?
I think it should resume from XYZ right?
m
If there are no checkpoints enabled, there's no offset committing done by Flink towards Kafka. So then it depends on the values for your
enable.auto.commit
and
<http://auto.commit.interval.ms|auto.commit.interval.ms>
- See https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kafka/#consumer-offset-committing
s
Ok I think I have failed to convey my question properly. So basically I am running one job in Doplhin scheduler. Now say I have killed the job and if I decided to start it again. It would ask me whether I need to start a fresh or I need to start from the checkpoint (It would show a list of checkpoints). Say I decided that I dont want to continue from last checkpoint, instead I want to start a fresh. But in both of these occaisions, if my group_id which I provide in KafkaSource is same then when I run the job the second time what would happen?
I think that since the group id same it would check in the broker/zk for that group id(Since it finds no checkpoint to resume the job from), if it finds the offset it would start from there otherwise it would start from latest as per the setting that I have provided
Copy code
kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
Please correct me if I am wrong
m
It would start from last committed offset. Keep in mind that fault tolerance can fail in those situations
s
Thanks Martijn. But I am wondering how would it fail fault tolerance? Could you please give one example?
Actually I am facing a weird issue. Our kafka broker has around 25 partition. I am using
Copy code
kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
I have kept one job running for some time. I killed the job. Then after 20 minutes I restarted the job with same group id but I dont want to resume from the last checkpoint. I am expecting that since the group id is same I would start exactly where I left before Now 10 out of 25 partitions finds the offset and I get this log Setting offset for partition ${topicName}-${topicNumber} to the committed offset FetchPosition And of remaining 15 partitions I am getting No offset found for partition ${topicName}-${topicNumber} Resetting offset for partition ${topicName}-${topicNumber} to XYZ which I suppose would be the latest offset. Now when this happen, Actually I am not getting data for the 20 minutes for which I stopped the job. I am getting only latest data. Do you have any idea how could this happen?
m
If you don’t use Flink checkpoints, fault tolerance won’t work. You must use checkpoints for fault tolerance. That’s because Flink doesn’t rely on offsets for fault tolerance
s
But I am enabling the flink checkpoints. You mean to say that if I am setting
Copy code
kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
Then implicitely it means that it wont use flink checkpoints in kafka source?
m
I don’t understand your situation anymore. You’ve mentioned going from being a new consumer to a existing consumer, to using checkpoints to not using checkpoints
s
Ok. Let me explain. Checkpoints are always enabled in my program. Always enabled as in whenever I start the program, if I decide to start it from previous checkpoint then it would start from there. On the other hand if I choose to start fresh it would create a fresh checkpoint. Now in addition to this situation, when I am creating kafkaSource I am setting following thing
Copy code
kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
Now say at 10:00 I started a job with fresh setup. I killed that job at 10:20. Now I am again restarting that job & I am choosing to start with a fresh new checkpoint So I am not resuming from the old checkpoint. Although I am doing this the group id of my kafkaSource remain same. So when I start the job at 10:20 I should be getting the data between 10:00 to 10:20 right? Because even if the checkpoints are not there, the committed offsets must exist in broker/zk. What I am observing is that out of 25 partitions that we have in the kafka 10/25 partitions saying that they have found the previously committed offset and they are setting that offset, while the remaining 15 says that they were not able to see the offset so they are resetting to latest because I have mentioned that. Since 10/25 partitions are finding the offsets properly I should get the data from 10:00 to 10:20 but I am not getting that data. So I was asking whether you know of any possible reason?
m
I’m not sure what you are expecting. Since you don’t start from a checkpoint, there are no fault guarantees. It basically depends on how the Kafka client handles this; Flink defers all decision making to the Kafka client in this case
s
Ok got it. Nothing I was just playing around woth options to understand the working
m
It can be because those partitions don’t have data so there’s nothing to read
There’s a whole lot of options that can happen. It all depends on how the Kafka Client handles those
s
Okk got it now. Thanks