:wave: Hello, team! I'm an engineer at Yelp, we ha...
# troubleshooting
s
đź‘‹ Hello, team! I'm an engineer at Yelp, we have database connector based on flink. I'm having issues with the window operator. One of the sub-task out of 40 of the TumblingProcessingTimeWindows operator accumulated windows over a day. The next restart of the job caused it to process the accumulated windows, which caused the checkpointing timeout. Once the sub-task has processed the old windows (might take several hours) it works normally again. Could you please come up with the ideas of what might cause the window operator sub-task to accumulate old windows for days until the next restart? I created a jira ticket with all the details and investigation results https://issues.apache.org/jira/browse/FLINK-35899
d
Looks like there might be some state imbalance looking the the JIRA ticket and size of 17th subtask
this could be due to uneven distribution of keys (doc ids) across parallel tasks
task manager failure and restart seem to coincide with start of the issue maybe task left in inconsistent state …
Did you perform a key distribution analysis? you might need to implement a custom partitioner to ensure more even distribution across tasks
You also might look at rocksdb tuning. ie buffer sizes cache sizes etc to make sure they are optimized
also review your shutdown strategy a bit
s
I thought of uneven distribution, but why the job was accumulating windows? If one sub-task got more messages to process it should slow it down and cause checkpointing failure earlier
d
thats a good point. I think you can also look at processing vs. event time
its crucial to check that the system clock on the machine running the affected subtask remains accurate through the window processing
a drift or inconsistency in processing time can disrupt the window boundaries and result in misalignment
These are somewhat rare but could cause this
Another thing you could look at is resource starvation. if task manger failure and restar left the subtask in a state where it could access sufficient resources to catch up effectively. I think in a way this is more likely and you might look at resource settings.
I think its either that or something related to custom timer firings due to something like incorrect timer setup, missed signals or interference with a custom trigger.
Other than that all I can think of is that its trying to checkpoint while the operator is overwhelmed. Need to maybe look closer at how flink checkpointing interacts with the stateful operator. If system tries to checkout while operator is overwhelmed this could cause something like this.
Let me take a look at the logs …
Ok so you faced a Taskmanager Disconnection. Looks like it crashed or was terminated. This in tern caused a failure of the sink task. Checkpoint was completed before failure.
The sub-task might resume processing from the last successful checkpoint but fails to recognize or properly process the accumulated windows that were supposed to be handled between the last checkpoint and the failure. This could stem from incomplete state restoration, misalignment of timers, or issues in the state backend. Given the scale of the problem, it’s possible that during recovery, the system did not properly manage the resumption of processing time timers or the cleanup of outdated state, leading to the backlog of unprocessed windows.
let me check logs for the recovery …
also check that task manager and job managers have their clocks synced
clock skew can cause this by making tasks misinterpret when windows should close
what is allowed lateness on the windows? does it seem reasonable or quite long?
s
Once the incident happened, I tried to restore the job from the earlier checkpoints and savepoints. I confirmed that the issue has accumulating effect. It means that savepoint from 6 hours and 1 days ago was affected while savepoint 2 days ago was clean
what is allowed lateness on the windows? does it seem reasonable or quite long?
where i could check it?
d
also I would check carefully the jobs operational timeline from between “clean” savepoint and the first affected one. could be maintenace update etc
look for any changes in input data patterns, config updates etc
check for late events, out of order events or surge in events
the allowed lateness should be in the window definitions somewhere for example:
Copy code
stream
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(30))) // window definition
    .allowedLateness(Time.minutes(5)) // allowing events to be late by 5 minutes
    .process(new YourWindowFunction());
You might need to try reduce the allowed lateness as too long can cause accumulation
search code and config files for allowedLateness
Another thing you can do is try using Flink’s State Processor API to compare the clean save state with the first one that has an issue.
if its available for your version of Flink
Another little known tool to examine state which is a bit of a black box is Queryable State which has been deprecated but in testing or dev you could use it. Sometimes its needed to debug issues like the one you are describing to understand what is going on.
Its not well documented but it can be used.

https://www.youtube.com/watch?v=8qp8BmnMxVkâ–ľ

In summary, I think we are looking at either a clock/timer configuration issue or skew, or there is an unusual network condition or data volume that periodically occurs for which the window settings/timeouts etc are not adjusted for. I think you need to examine all events and event flows around the timeframe between successful clean save and the incidence to look for patterns.