Paul Annesley
09/03/2024, 11:21 AMapplyToKeyedState
, e.g. to copy items from one state to another?
I'm writing a Flink job that uses Broadcast State to conditionally buffer data from the main keyed stream into state, and then trigger processing that data via events on the broadcast stream. I see that in processBroadcastElement
I can process all keys of a state descriptor using applyToKeyedState(descriptor, (key, state) -> { … })
but I need access to two descriptors at once to move items between two MapStates. Is this possible?Paul Annesley
09/03/2024, 11:25 AMprocessBroadcastElement
.Paul Annesley
09/03/2024, 11:49 AMctx.applyToKeyedState
for the two descriptors, but I'm not sure if that's intended usage.D. Draco O'Brien
09/04/2024, 6:42 AMapplyToKeyedState(descriptor1, (key, keyedStateBackend) -> {
// get the first state
MapState<String, String> mapState1 = keyedStateBackend.getPartitionedState(
VoidNamespace.INSTANCE,
new CustomVoidNamespaceSerializer(),
descriptor1);
// get the second state
MapState<String, String> mapState2 = keyedStateBackend.getPartitionedState(
VoidNamespace.INSTANCE,
new CustomVoidNamespaceSerializer(),
descriptor2);
// perform the copying or operations between the two states
for (Map.Entry<String, String> entry : mapState1.entries()) {
mapState2.put(entry.getKey(), entry.getValue());
}
// optionally clear the first state after copying if needed
mapState1.clear();
});
In this snippet, descriptor1 and descriptor2 are the state descriptors for the two MapState instances you want to interact with.