hi, I'm using PyFlink 1.13.2 and running into some...
# troubleshooting
d
hi, I'm using PyFlink 1.13.2 and running into some odd issue with
KeyedProcessFunction
in unit tests. When unit tests the function on its own (
from_collection
source and test sink), everything works fine. However, when it gets mixed into a bigger program, only
process_element
is being called and
on_timer
is not being called at all. The sources and sink in these 2 cases are exactly the same (
from_collection
and test sink). What are the possible causes of
on_timer
not being called? Update: In a bigger program with 2
KeyedProcessFunction
, Only the 1st function
on_timer
is called. The 2nd function is not.
My program looks like this, where:
per_device_enricher
and
x_device_enricher
are 2
KeyedProcessFunction
.
user_id_enricher
is a
CoFlatMapFunction
.
on_timer
is being called on
per_device_enricher
but it is not being called on
x_device_enricher
.
Copy code
event_src = self.env.from_collection(
            event_stream,
            type_info=source_event_types
        )

        per_device_sessionising = (
            event_src
            .key_by(
                key_selector=src_anon_id_key_selector,
                key_type=Types.STRING()
            )
            .process(per_device_enricher, output_type=session_event_types)
        )
        (
            event_src
            .flat_map(
                DeviceUserFlatMap(),
                output_type=device_mapping_event_types
            )
            .connect(per_device_sessionising)
            .key_by(
                mapping_anon_id_key_selector,
                sess_anon_id_key_selector,
                key_type=Types.STRING()
            )
            .flat_map(user_id_enricher, output_type=session_event_types)
            .key_by(sess_user_id_key_selector, key_type=Types.STRING())
            .process(x_device_enricher, output_type=session_event_types)
            .add_sink(self.test_sink)
            .name(str(uuid.uuid4()))
        )
I'm try to trigger
on_timer
with:
Copy code
def process_element(self, event, ctx: "KeyedProcessFunction.Context"):
        current_ts = ctx.timestamp()
        ...
        ctx.timer_service().register_event_time_timer(
            current_ts + 200_ms
        )
d
@Duc Anh Khu Have you defined watermark generation strategy? Is the watermark advancing normally?
d
@Dian Fu, I have tried with both watermark and without. The result is the same.
on_timer
is called on the 1st
.process
(
per_device_enricher
) but not on the 2nd one (
x_device_enricher
). I also try using
per_device_enricher
in both places for
.process
and getting the same result where
on_timer
is called on only once on the 1st function. Also tried to set watermark to something very small (<200ms) and also set
register_event_time_timer
to a smaller value and the result is still the same.
d
@Duc Anh Khu Could you set the following configuration
python.fn-execution.bundle.size: 1
to see if it works?
d
that works! thanks. I'm guessing python code is bundled for execution. But in test, since the input is from a small collection, the app (somehow?) terminates before the python code is executed?
d
@Duc Anh Khu Could you share the whole job? (I want to take a look at how you submitted the job.) There is a
wait()
method which could be used to wait until the job finished.
d
my test is based on your example here. The rest of the test looks like this:
Copy code
class AllEnrichersTests(PyFlinkStreamingTestCase):
    """AllEnrichersTests"""

    mock_session_generator = None

    def setUp(self) -> None:
        super(AllEnrichersTests, self).setUp()
        self.mock_session_generator = MockedSessionGenerator()
        self.test_sink = DataStreamTestSinkFunction()
        # <https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/python/python_config/#configuration>
        # This config is required to make application runs properly in tests
        pf_config = Configuration(j_configuration=get_j_env_configuration(self.env._j_stream_execution_environment))
        pf_config.set_integer("python.fn-execution.bundle.size", 1)

    def tearDown(self) -> None:
        self.test_sink.clear()

    def enrich_events(self, event_stream):
        per_device_enricher = PerDeviceSessionEnricher(config, self.mock_session_generator)
        user_id_enricher = UserIdBackProp()
        x_device_enricher = XDeviceSessionEnricher(config)
        _, source_event_types = SOURCE_TYPE_SCHEMA.zipped()
        _, session_event_types = SESSION_EVENT_SCHEMA.zipped()
        _, device_mapping_event_types = DEVICE_USER_MAPPING_SCHEMA.zipped()
        event_src = self.env.from_collection(
            event_stream,
            type_info=source_event_types
        )

        per_device_sessionising = (
            event_src
            .key_by(
                key_selector=src_anon_id_key_selector,
                key_type=Types.STRING()
            )
            .process(per_device_enricher, output_type=session_event_types)
        )
        user_id_enriched = (
            event_src
            .flat_map(
                DeviceUserFlatMap(),
                output_type=device_mapping_event_types
            )
            .connect(per_device_sessionising)
            .key_by(
                mapping_anon_id_key_selector,
                sess_anon_id_key_selector,
                key_type=Types.STRING()
            )
            .flat_map(user_id_enricher, output_type=session_event_types)
        )
        (
            user_id_enriched
            .key_by(sess_user_id_key_selector, key_type=Types.STRING())
            .process(x_device_enricher, output_type=session_event_types)
            .add_sink(self.test_sink)
            .name(str(uuid.uuid4()))
        )
        self.env.execute()
how do I use
wait()
with stream API?
hi @Dian Fu, I have a slightly related issue to pyflink tests and would be much appreciated if you can point me to the right direction. My tests which uses the same test_utils above, sometimes randomly hang without any errors/outputs. Do you know why it does that and how to mitigate it? many thanks