Hi everyone! Our team is experiencing some sort of...
# troubleshooting
l
Hi everyone! Our team is experiencing some sort of a memory leak when a streaming job is running idle on an EMR cluster. The jobs are quite simple with a
Kafka Source
->
Process Function
->
Kafka Sink
. The jobs are stateless and are just keeping track of the offsets for Kafka in order to make progress in processing the stream. Everything seems to run fine when the job has something to process (from a few thousand to tens of millions of events per day). However if there are no messages for a few days then the TaskManager heap usage seems to increase until the job crashes. There don’t seem to be any logs that might indicate what the problem is. Once the TaskManager heap usage is too high a SIGTERM signal is received and the shutdown is triggered. Any idea what the problem might be here? Any sort of clues or resources to memory leak issues with Flink or Flink running on EMR would be really appreciated!
I’ve managed to take heap dumps from TaskManager container and it has provided me with some more insight. When comparing the heap dumps during the job start and comparing it to when the heap usage gets close to 100%, then we can see that most of the heap is occupied by SourceReader, which in this case is the KafkaSourceReader. The
SynchronizedSortedMap
offsetsToCommit
might be the root of the problem. When does this offsetsToCommit get emptied when there is nothing to commit? As was described earlier this problem occurs when there is no incoming data. However our jobs checkpoint quite often (every 5 seconds), so I’m wondering if some sort of an empty object is added to offsetsToCommit during every checkpoint, but they are never emptied because we don’t receive any messages so there’s no reason to commit and empty this collection. In order to test this theory I produced 1 message to the Kafka topic that the KafkaSourceReader is consuming and as a result the heap memory was cleared.
Running a component report on the KafkaSourceReader with Eclipse Memory Analyzer seems to confirm this theory
Looking at the source code for the KafkaSourceReader we can see that this line creates new HashMap collections. However the offsetsToCommit should be cleared out later in
notifyCheckpointComplete
, not sure why it’s not happening
m
What are the Flink versions that you're using?
l
had this issue with 1.14.2 and 1.17.0 so far
m
What version of the Flink Kafka connector were you using?
l
Copy code
org.apache.flink:flink-connector-kafka_2.12:1.14.2
and
Copy code
org.apache.flink:flink-connector-kafka:1.17.0
m
The latest version is 3.0.0-1.17.0 but looking at the fixed issues, I'm not sure this is one
Can you file a Jira for this?
l
I’m waiting on approval to create my Jira account and I will then submit the ticket.
m
I see it's just been approved 🙂
l
I’ve created a ticket FLINK-33231, please let me know if there’s anything that should be improved/clarified or what should be done next.
t
thanks @Lauri Suurväli for debugging this, great detective work! I left a comment on the ticket on describing the fix for this - was wondering if you want to take a stab at fixing this? This is a rather bad one, and I’d hope to include it in the next hotfix release which I want to kick off soon before end of this week.
I’ve came up with a fix - opening a PR now, would appreciate a review
sorry @Lauri Suurväli I didn’t realize you left a new comment on the JIRA 😅 I’ve already finished a fix and am opening a PR, would be great if you can review that.
l
thanks for all the help and feedback @Tzu-Li (Gordon) Tai! The PR looks great. Let me know if there’s anything that I could help with