:wave: Hey everyone! I've been working with a `Ke...
# troubleshooting
c
πŸ‘‹ Hey everyone! I've been working with a
KeyedCoProcessFunction
in PyFlink and ran into some challenges while trying to unit test its behavior, especially when it comes to handling
ListState
. For context, here's a simple
KeyedCoProcessFunction
I'm using:
Copy code
from pyflink.datastream import KeyedCoProcessFunction

class SimpleCoProcessFunction(KeyedCoProcessFunction):

    def open(self, runtime_context):
        self.voyage_state = runtime_context.get_list_state("voyage-state")

    def process_element1(self, value, ctx, out):
        current_state = list(self.voyage_state.get())
        current_state.append(value)
        self.voyage_state.update(current_state)
        out.collect(current_state)
Now, to test this function, I've had to mock the behavior of
ListState
, but I find the process a bit involved without native PyFlink testing utilities:
Copy code
from unittest.mock import Mock

def test_simple_coprocess_function():
    # Given
    voyage = "SampleVoyageData"

    # Mock ListState
    class FakeListState:
        def __init__(self):
            self.state = []

        def get(self):
            return self.state

        def update(self, values):
            self.state = values

    # Set up mock runtime context
    mock_runtime_context = Mock()
    mock_runtime_context.get_list_state.return_value = FakeListState()

    func = SimpleCoProcessFunction()
    func.open(mock_runtime_context)

    # When
    output_collector = Mock()
    func.process_element1(voyage, None, output_collector)

    # Then
    assert output_collector.collect.call_args_list[0][0][0] == ["SampleVoyageData"]
Given the above, I have a few questions: 1. Are there any out-of-the-box tools or utilities in recent versions of PyFlink that make testing these scenarios easier? 2. If not, has anyone in the community come up with an elegant solution or best practices for testing stateful Flink functions in PyFlink? 3. Are there any plans or ongoing efforts to introduce such testing utilities to PyFlink? Thanks in advance for any insights! 😊
I see no responses here. I ll assume I can’t get any help. Are there any contributors in this channel. If yes, would anyone be interested in having a chat with me about pyFlink over zoom? We are thinking of adopting it at my company and I am responsible for a PoC. Could use some good a advice and insights on where the project is headed
r
i think you might get a better response if you can clarify what was a bit more involved while mocking ListState: I suggest writing it up to explain: β—¦ current painpoint β—¦ expectation: what would you like to see supported natively within pyflink.
c
it was fine to be honest. The pain point is exactly that though... There are no out of the box tools to do this without duck-typing or mocking right?
βœ… 1
Let me offer an example from kafka-streams (java-lib). Using kafka-streams, if you want to test a statefull operator, you can instantiate a state as follows:
Copy code
final Processor<ID, Value> processorUnderTest = new MyStateFullProcessor();
  final MockProcessorContext context = new MockProcessorContext();
  final KeyValueStore<String, MyPojo> state =
      Stores.keyValueStoreBuilder(
              Stores.inMemoryKeyValueStore("State-store-name"),
              Serdes.String(),
              Serde.MyPojo())
          .withLoggingDisabled() // Changelog is not supported by MockProcessorContext.
          .build();
and then do something like:
Copy code
@Before
  public void before() {
    state.init(context, state);
    context.register(state, /*parameter unused in mock*/ null);
    processorUnderTest.init(context);
  }
and then you can use these in your tests
βœ… 1
This makes it hard to try and mock state-object's behaviour that does not exist. Let alone the fact that everything is provided out of the box.
βœ… 1
The confidence I have for my testing implementation is lower than actually using pyFlink default testHarness tooling
Does this make sense @RootedLabs?
r
I totally understand, imho when it comes to stream processing data, python is not my fav. I think java does have much better support, but hey thats an opportunity to contribute and take one for the team. I want to recommend reaching out to the community owners here to help you connect with folks here who have more deeper exposure on using pyflink. I think folks must have tried to solve this and can provide with better inputs.
c
Anyone in particular in mind?
I would love to connect and contribute
r
tbh, I am not really sure in particular, but i would figure out from here: *community*@flink.apache.org β€’ based on this page: https://flink.apache.org/community/
looks like you can try to post in #C03FAEU4MJB based on the page above.
Copy code
After creating an account in Slack, don't forget to introduce yourself in #introductions.
πŸ‘ 1