Hi! Can two state descriptors be accessed at the s...
# troubleshooting
p
Hi! Can two state descriptors be accessed at the same time from
applyToKeyedState
, e.g. to copy items from one state to another? I'm writing a Flink job that uses Broadcast State to conditionally buffer data from the main keyed stream into state, and then trigger processing that data via events on the broadcast stream. I see that in
processBroadcastElement
I can process all keys of a state descriptor using
applyToKeyedState(descriptor, (key, state) -> { … })
but I need access to two descriptors at once to move items between two MapStates. Is this possible?
If there's no other good way, perhaps I can rework the two MapStates into a single MapState with composite keys, which feels unfortunate but would mean only needing to access a single state descriptor at a time from the
processBroadcastElement
.
It looks like maybe I can nest two layers of
ctx.applyToKeyedState
for the two descriptors, but I'm not sure if that's intended usage.
d
Flink’s KeyedStateBackend interface is available within lambda function passed to applyToKeyedState, but does not directly support getting multiple state handles in one go. There is a workaround however by using the key, and the state backend provided in the lambda to manually retrieve both states. Here’s a simplified example of how you might achieve this:
Copy code
applyToKeyedState(descriptor1, (key, keyedStateBackend) -> {
    // get the first state
    MapState<String, String> mapState1 = keyedStateBackend.getPartitionedState(
            VoidNamespace.INSTANCE,
            new CustomVoidNamespaceSerializer(),
            descriptor1);

    // get the second state
    MapState<String, String> mapState2 = keyedStateBackend.getPartitionedState(
            VoidNamespace.INSTANCE,
            new CustomVoidNamespaceSerializer(),
            descriptor2);

    // perform the copying or operations between the two states
    for (Map.Entry<String, String> entry : mapState1.entries()) {
        mapState2.put(entry.getKey(), entry.getValue());
    }

    // optionally clear the first state after copying if needed
    mapState1.clear();
});
In this snippet, descriptor1 and descriptor2 are the state descriptors for the two MapState instances you want to interact with.