https://pinot.apache.org/ logo
j

Josh Highley

06/07/2021, 3:31 PM
I have a realtime table consuming messages from a 3 partition Kafka topic. Possibly due to some network issues over the weekend, all 3 consumers are repeating the same error messages about a bad offset:
Copy code
2021/06/07 15:24:27.918 INFO [Fetcher] [agent_daily__2__2__20210605T0819Z] [Consumer clientId=consumer-71, groupId=] Fetch offset 22 is out of range for partition agent_daily-2, resetting offset
2021/06/07 15:24:27.919 INFO [Fetcher] [agent_daily__2__2__20210605T0819Z] [Consumer clientId=consumer-71, groupId=] Resetting offset for partition agent_daily-2 to offset 5.
2021/06/07 15:24:27.938 INFO [Fetcher] [agent_daily__1__2__20210605T0819Z] [Consumer clientId=consumer-73, groupId=] Fetch offset 20 is out of range for partition agent_daily-1, resetting offset
2021/06/07 15:24:27.939 INFO [Fetcher] [agent_daily__1__2__20210605T0819Z] [Consumer clientId=consumer-73, groupId=] Resetting offset for partition agent_daily-1 to offset 0.
2021/06/07 15:24:27.942 INFO [Fetcher] [agent_daily__0__2__20210605T0819Z] [Consumer clientId=consumer-72, groupId=] Fetch offset 24 is out of range for partition agent_daily-0, resetting offset
2021/06/07 15:24:27.943 INFO [Fetcher] [agent_daily__0__2__20210605T0819Z] [Consumer clientId=consumer-72, groupId=] Resetting offset for partition agent_daily-0 to offset 1.

2021/06/07 15:24:33.018 INFO [Fetcher] [agent_daily__2__2__20210605T0819Z] [Consumer clientId=consumer-71, groupId=] Fetch offset 22 is out of range for partition agent_daily-2, resetting offset
2021/06/07 15:24:33.018 INFO [Fetcher] [agent_daily__2__2__20210605T0819Z] [Consumer clientId=consumer-71, groupId=] Resetting offset for partition agent_daily-2 to offset 5.
2021/06/07 15:24:33.038 INFO [Fetcher] [agent_daily__1__2__20210605T0819Z] [Consumer clientId=consumer-73, groupId=] Fetch offset 20 is out of range for partition agent_daily-1, resetting offset
2021/06/07 15:24:33.039 INFO [Fetcher] [agent_daily__1__2__20210605T0819Z] [Consumer clientId=consumer-73, groupId=] Resetting offset for partition agent_daily-1 to offset 0.
2021/06/07 15:24:33.042 INFO [Fetcher] [agent_daily__0__2__20210605T0819Z] [Consumer clientId=consumer-72, groupId=] Fetch offset 24 is out of range for partition agent_daily-0, resetting offset
2021/06/07 15:24:33.043 INFO [Fetcher] [agent_daily__0__2__20210605T0819Z] [Consumer clientId=consumer-72, groupId=] Resetting offset for partition agent_daily-0 to offset 1.
The 'reset' offsets of 5, 0, and 1 are correct: I created a new 'test' table for the same topic and it used those offsets with no issue. I've tried disabling/enabling the table but it resumes those error messages. Is there some other way to reset the table consumers?
m

Mayank

06/07/2021, 7:57 PM
Sounds like an issue on Kafka side might have made Pinot's offset state inconsistent? https://github.com/dpkp/kafka-python/issues/972
If so, then the only way I can think of is to delete and recreate the table in Pinot.
j

Josh Highley

06/07/2021, 7:58 PM
but I can create a new table against the same topic, and not have the issue. Is there any other way to reset Pinot's offset state? Luckily this is a dev environment -- deleting and re-creating the table isn't really an option in production
...Pinot seems to know what the offset should be, since it logs it. It doesn't seem to use that offset, though
m

Mayank

06/07/2021, 8:00 PM
If the offsets are corrupted on the kafka side, then it is a kafka side issue that needs to be prevented in production?
Would be good to understand the root cause here first, if we want to avoid it in production.
If it turns out to be a pinot side issue, then yes definitely we should have a way to recover gracefully.
BTW, the log message is from kafka consumer.
j

Josh Highley

06/07/2021, 9:11 PM
I can create a new table subscribed to the same topic and receive messages while the existing table still does not. If there was a Kafka issue at some point then it appears corrected
m

Mayank

06/07/2021, 9:54 PM
What I am saying is that Pinot stores offsets as it commits segments. If a disruptive change on kafka side makes these offsets inconsistent with what Pinot had seen earlier, then the kafka consumer inside of Pinot will run into this state.
When you start a new table, the new table is seeing all consistent offsets (as it doesn't have anything saved).
Can you try to restart the Pinot servers and see if that helps?
My suspicion is that after Pinot committed segments and saved the offsets for them, there was a disruptive change in kafka side due to which these saved offsets are out-of-sync with Kafka.
s

Subbu Subramaniam

06/07/2021, 10:23 PM
See what the segment metadata says regarding the offset where the segment is supposed to start consuming. Pinot uses that offset. Also, have you what is your kafka retention time? Maybe increasing it a little will solve your problem
j

Josh Highley

06/08/2021, 6:48 PM
the segment metadata has the higher offsets. The consumer is trying to reset the offsets to the lower indexes, then the segment isn't using that?
I read through the kafka issues and tried restarting the 3 brokers but it made no difference
...I can create a new table with the same topic and it will consume fine. The existing table doesn't seem to want to reset to the correct offset, even though the consumer seems to know what the correct offset is.
m

Mayank

06/08/2021, 7:03 PM
So say that before the issue occured, you had offsets 1, 2, 5, 10, 20....100...500 (note they are only monotonically increasing, but don't have to be contiguous. And say pinot consumed until offset 100. Now it wants to consume from offset after 100. Howver, something disruptive happened on Kafka side, and the client is being told that there is no offset > 100.
Pinot cannot simply consume from an arbitrary offset, because it doesn't know what the message at offset 100 (until the point it had consumed) corresponds to after the disruptive change.
What if new offsets are 1, 2, 3, 4, 7, 10, 11... there's no 1-1 mapping to ensure what has been consumed and what hasn't been.
Also note that offset checkpointed in Pinot is the offset that is already committed to Pinot. Current consuming offset is >= this checkpoint, because checkpointing happens only at intervals.
j

Josh Highley

06/08/2021, 7:41 PM
right, but I believe Kafka knows what the new current offset should be, and the pinot consumer seems aware of it. Why wouldn't the consumer start at the new offsets: 5, 0, and 1
Copy code
Fetch offset 22 is out of range for partition agent_daily-2, resetting offset
Resetting offset for partition agent_daily-2 to offset 5.
Fetch offset 20 is out of range for partition agent_daily-1, resetting offset
Resetting offset for partition agent_daily-1 to offset 0.
Fetch offset 24 is out of range for partition agent_daily-0, resetting offset
Resetting offset for partition agent_daily-0 to offset 1.
m

Mayank

06/08/2021, 7:51 PM
I think can only kafka know about the last consumed offset. But last consumed != last checkpointed. And will have holes between last consumed and what is checkpointed in Pinot
For example, I am not sure if kafka knows or guarantees that offset 22 in old == offset 5 in new. 5 might just be some default (perhaps consumer state stored, which is optional)
I am curious though, do you know what really happened on the Kafka side?
j

Josh Highley

06/08/2021, 8:39 PM
No, not for sure. I know the environment had network issues over the weekend so it's possible the brokers lost touch with each other and possibly their zookeeper.
s

Subbu Subramaniam

06/08/2021, 9:02 PM
I find it odd from the msg that 22 is out of range, so it is resetting to 5. Did you by any chance change the topic of consumption? Have you tried (or can you try) dropping and re-creating the table?
j

Josh Highley

06/08/2021, 9:07 PM
I made no topic changes. the consumer says it's resetting to 5, but then 30 seconds later it tries 22 again, then says resetting to 5....repeat. 5 is the correct offset and a new table with the same topic will get messages fine
m

Mayank

06/08/2021, 9:08 PM
@Subbu Subramaniam creating a new table works perfectly fine (so dropping and recreating should as well I think)
j

Josh Highley

06/08/2021, 9:08 PM
I have not dropped the table because I want to simulate recovering a prod environment, in which case delete the table isn't viable
If I just delete the consuming segments, is there a way to make Pinot create new consuming segments?
m

Mayank

06/09/2021, 5:19 PM
Yeah, but it will pick up the offset from the checkpoint (ie previous flushed segment).
j

Josh Highley

06/09/2021, 5:21 PM
seems like more manual control is needed in this area. If a Kafka server is catastrophically lost and re-built, then that would mean any attached Pinot tables would also need rebuilt, no?
m

Mayank

06/09/2021, 5:23 PM
Yeah, agree it is not a great situation to be in. Which is why I want to understand what happened on the Kafka side. And if there are offset changes, is there a way for Pinot to use some info from Kafka to reliably recover. I am not sure if the 22 -> 5 offset in your example is the right mapping.
If you think it is, I recommend filing an issue on Pinot side, we can explore further.
j

Josh Highley

06/09/2021, 5:24 PM
if I create a new table then it will use the reset offsets - 5 in this case
...and start processing messages from there
I simply need some way to make Pinot forget what it knows about a topic and just start processing anything new
m

Mayank

06/09/2021, 5:26 PM
Yeah, then just delete all segments, and restart servers
j

Josh Highley

06/09/2021, 5:26 PM
that requires reload all data from the beginning though
I meant to reset topic processing without deleting the table or segments
m

Mayank

06/09/2021, 5:27 PM
what is reset here?
I think you just want to reset to whatever Kafka told you (eg 5)?
I am trying to say that it is not necessarily correct. Data has been committed into Pinot already. Restarting from 5, what's the guarantee there''s duplicate ingestion of events?
j

Josh Highley

06/09/2021, 5:31 PM
I suspect something happened with the topic such that its data was wiped. The other topics are working, so I'm not sure what happened with this particular one. In any case, if there's a catastrophic issue with the topic, I need to way to accept whatever risk and force Pinot to re-start from a particular offset. It would be on me to ensure that offset won't result in duplicate events, etc
m

Mayank

06/09/2021, 5:32 PM
I see.
j

Josh Highley

06/09/2021, 5:32 PM
if this was a production table with years of data and millions of events, deleting the table to start over is a huge issue
m

Mayank

06/09/2021, 5:34 PM
I can see that use case.
j

Josh Highley

06/09/2021, 5:34 PM
so, sort of a last resort short of deleting a table: "ignore commits, ignore offsets, etc just start consuming at this offset"
m

Mayank

06/09/2021, 5:34 PM
Yeah, not sure if there's a simple way already (@Subbu Subramaniam?)
But if not, can you file an issue?
Basically, override the offset check and move forward
s

Subbu Subramaniam

06/09/2021, 5:49 PM
There is currently no push-button way to forget kafka offsets and restart. Pinot ingestion is based on comparable offsets, with higher ones indicating newer events. All the auto-correction mechanisms also (perhaps implicitly) assume that. I am curious what the behavior of pinot is, if kafka decides to renege on the offset contract. I am wondering if any of the auto-correction mechanisms are kicking in (controller logs should tell). Restarting the controller may help. I think in your case, the controllers and servers are talking to different kafka brokers, causing inconsistency in their view. I am guessing that the auto-correction mechanism will then kick in and set the right offset in the metadata.
To answer Mayank's question, if all of these fails, I don't know of a mechanism to reset the offsets other than hand-editing the segment metadata in zookeper.
j

Josh Highley

06/09/2021, 6:36 PM
I opened #7039