Christos Hadjinikolis
10/05/2023, 9:24 AMKeyedCoProcessFunction
in PyFlink and ran into some challenges while trying to unit test its behavior, especially when it comes to handling ListState
.
For context, here's a simple KeyedCoProcessFunction
I'm using:
from pyflink.datastream import KeyedCoProcessFunction
class SimpleCoProcessFunction(KeyedCoProcessFunction):
def open(self, runtime_context):
self.voyage_state = runtime_context.get_list_state("voyage-state")
def process_element1(self, value, ctx, out):
current_state = list(self.voyage_state.get())
current_state.append(value)
self.voyage_state.update(current_state)
out.collect(current_state)
Now, to test this function, I've had to mock the behavior of ListState
, but I find the process a bit involved without native PyFlink testing utilities:
from unittest.mock import Mock
def test_simple_coprocess_function():
# Given
voyage = "SampleVoyageData"
# Mock ListState
class FakeListState:
def __init__(self):
self.state = []
def get(self):
return self.state
def update(self, values):
self.state = values
# Set up mock runtime context
mock_runtime_context = Mock()
mock_runtime_context.get_list_state.return_value = FakeListState()
func = SimpleCoProcessFunction()
func.open(mock_runtime_context)
# When
output_collector = Mock()
func.process_element1(voyage, None, output_collector)
# Then
assert output_collector.collect.call_args_list[0][0][0] == ["SampleVoyageData"]
Given the above, I have a few questions:
1. Are there any out-of-the-box tools or utilities in recent versions of PyFlink that make testing these scenarios easier?
2. If not, has anyone in the community come up with an elegant solution or best practices for testing stateful Flink functions in PyFlink?
3. Are there any plans or ongoing efforts to introduce such testing utilities to PyFlink?
Thanks in advance for any insights! πChristos Hadjinikolis
10/05/2023, 8:28 PMRootedLabs
10/05/2023, 10:12 PMChristos Hadjinikolis
10/06/2023, 1:29 PMChristos Hadjinikolis
10/06/2023, 2:34 PMfinal Processor<ID, Value> processorUnderTest = new MyStateFullProcessor();
final MockProcessorContext context = new MockProcessorContext();
final KeyValueStore<String, MyPojo> state =
Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("State-store-name"),
Serdes.String(),
Serde.MyPojo())
.withLoggingDisabled() // Changelog is not supported by MockProcessorContext.
.build();
and then do something like:
@Before
public void before() {
state.init(context, state);
context.register(state, /*parameter unused in mock*/ null);
processorUnderTest.init(context);
}
and then you can use these in your testsChristos Hadjinikolis
10/06/2023, 2:35 PMChristos Hadjinikolis
10/06/2023, 2:35 PMChristos Hadjinikolis
10/06/2023, 2:36 PMRootedLabs
10/06/2023, 7:27 PMChristos Hadjinikolis
10/06/2023, 7:32 PMChristos Hadjinikolis
10/06/2023, 7:32 PMRootedLabs
10/06/2023, 7:34 PMRootedLabs
10/06/2023, 7:36 PMAfter creating an account in Slack, don't forget to introduce yourself in #introductions.