I have a big problem.. we have a simple app that ...
# troubleshooting
a
I have a big problem.. we have a simple app that read from kafka and write into Filesystem The throughput is 70Mb/s and the messages count is 400k per second I tried to develop a new app that parse csv message into Pojo class and reduce number of columns from 250 to 70 .. this parsing is very processing intensive (I don’t know why) Flink read from 16 partitions from 5 broker and I tried set parallelism of flink app 16 but I get back pressure so I modified the code do to the parsing in map function rather than in consumer operator and parallelism this map operator to 108 parallelism (I have a machine with 104 core) but I can’t get the first app throughput And the busy metric of mapping cores is 90% !! 😂 It was a really disappointing because it is a simple app where this degraded performance comes from
Also with converting the map function to just build a string rather than creating pojo object the performance was a slightly better
k
Since the Kafka source parallelism set to 16, Flink has to do a shuffle to redistribute data to the 108 map tasks. You don’t provide a lot fo details, but this serialization/deserialization can be expensive, especially for large records. I assume you’re reading strings from Kafka, and sending those to the map function, right? And you set the sink parallelism to match the map parallelism, yes? (otherwise another shuffle will happen)
a
When I do the source and map parallelism to 16 the throughput is 2mb/s ! Note that it is all on one task manager with 108 slots and 108 cores Yes, I read it as csv and save it into Filesystem as csv.gz .. the length of every message to 250
k
1. it’s likely you’re overloading your cores, as Flink needs some for Job Manager and other tasks. 2. Did you set the sink parallelism to match the map parallelism. 3. How can the message size be 250 bytes if you have 250 columns?
a
Note also it is standalone cluster 1- This machine has only one task manager and only one app that I mentioned above 2- I Tried the sink to match the source parallelism and to match the map parallelism and no difference in performance 3-I meant the length is 250 columns For 5 minutes data the data saved in filesystem as csv.gz is 8 gb with 80 M rows
It’s ordinary that parsing takes so much processing?
k
Depends on the parsing library you’re using
a
What is recommended parsing library ? And recommended approach?
k
A quick Google search says: https://github.com/osiegmar/FastCSV
Also note that if your source and sink can keep up, then the parsing code will ALWAYS be running at nearly 100% of CPU usage.
So to summarize. 1. Source parallelism == number of partitions (16) 2. Map parallelism to something reasonable, like 80-90% of available cores. 3. Sink parallelism == map parallelism. 4. Use a fast CSV parser. 5. Flamegraphs are your friend
a
And trying to make a csv parsing creating a pojo object or just concat the string fields to return the desired string
?
k
Create a POJO
a
Am afraid of creating too much pojo objects due to high throughput of the message.. should I use class pool ?
k
Try the simple thing first, then measure and optimize.