Hi folks, I'm using a big ValueState in a task. Du...
# troubleshooting
m
Hi folks, I'm using a big ValueState in a task. During picks of messages (and then writes to ValueState), the lastCheckpointDuration metric soars. I'm evaluating to split in multiple small ValueStates. Is it better for checkpoint process? The state backend is RockesDB
d
Splitting a large state into multiple smaller states can indeed be beneficial for the checkpointing process, especially when using a state backend like RocksDB. Flink’s checkpointing can become a bottleneck if there’s a big state to serialize and write out during a checkpoint. By splitting your state into smaller chunks, you allow Flink to parallelize the checkpointing process more effectively.
It should also reduce memory pressure. Large states can cause significant memory pressure during checkpointing. Smaller states also mean faster recovery times since less data needs to be loaded back into memory from the state backend.
You also might benefit from parallel IO operations on RocksDB
I would say before implementing this you might want to benchmark it. Spit vs. Not Split and see what performance improvements you get.
Aside from splitting you can also look at tuning RocksDB in terms of buffer size, block cache size etc. Also checkpoint interval and timeout settings.
m
How does incremental checkpoint works in this case? Let's say I have a large ValueState and I change a boolean field within this large object. During the incremental checkpoint is the entire object reloaded or just the boolean?
d
Flink’s state backend (in this case, RocksDB) should ideally only save the delta or the change set which includes the update to the boolean field, not the entire object. RocksDB, as a state backend, has its own internal mechanisms for managing these changes efficiently. It keeps track of state mutations in a write-ahead log (WAL) and memtables. When an incremental checkpoint is triggered, only the changes (deltas) recorded since the last checkpoint are considered, rather than the full state.
If the state is designed in such a way that modifying one field requires loading or rewriting the entire object, the benefits of incremental checkpoints may be limited. So you should try to design your state to minimizes the amount of data rewritten when making updates. This will help optimize incremental checkpoints. Ideally with properly configured incremental checkpoints, only the change to the boolean field would be part of the incremental checkpoint, not the entire large object. But the actual behavior can be affected by how the state is updated and the specifics of your implementation.
m
I was wondering if when you speak, you speak having had field experience or just reading the documentation...what does it mean to do State design? is a State...that's it
d
I have field experience, but without knowing the specifics of your state structure, Flink version etc, it’s difficult to advise more specifically, and indeed the official Flink documentation is the best source to understand about State in Flink.
I would say there are several factors to state design in Flink. At a minimum you need to consider State Size (how to minimize), Granularity of State which we discussed and should you choose to break down into smaller pieces you need to identify logical components within your state that you can update independently. Access patterns like how you do key lookups (RocksDB or MapState?) etc. or ListState in the case of ordered lists. (We don’t know for example if you are restricted to RocksDB or simply using that at the moment). There is also the possibility of using immutability in the update process. All these factor into proper state design in Flink. Without seeing your state structure and scale its difficult for anyone to advise more specifically.
m
I'm restricted to use RocksDB state backend. What is the immutability in the update process?
d
Immutability within the update process is about designing your state updates in such a way that once a state object is created, it’s never modified; instead, any “update” operation generates a new version of the object while keeping the old version intact.
You get simpler incremental checkpoints when your state objects are immutable, and changes result in new objects, the checkpointing process can more easily identify the differences between checkpoints. It only needs to save the deltas or references to the new versions, rather than trying to figure out what parts of a mutable object have changed.
Additionally Immutable objects are thread-safe since they cannot be changed after creation. This can simplify the design of your application, especially in highly concurrent environments like those involving distributed processing. And you also see this done a lot with other distributed systems. It’s also easier to reason about the application flow since the state at any point in time is a snapshot that won’t change unexpectedly. This can significantly aid in debugging and writing reliable tests.
That being said its not a panacea, and there are some tradeoffs. Implementing immutability might require some restructuring of how you manage your state updates. Instead of modifying fields directly within a large object, you might create a new instance of the object with the updated field(s). This could involve creating builder patterns, copy constructors, or leveraging libraries that support efficient creation of object copies with minimal overhead. You might see increase in memory consumption because of the additional object creation. Its important to benchmark the approach first and consider the trade offs carefully.
I would be interested to hear what others have to say about immutability for Flink State and what their experience has been with it. The criteria for when or when not to use it seems like a good discussion.