Duc Anh Khu
04/07/2023, 10:19 PMKeyedProcessFunction
in unit tests. When unit tests the function on its own (from_collection
source and test sink), everything works fine. However, when it gets mixed into a bigger program, only process_element
is being called and on_timer
is not being called at all. The sources and sink in these 2 cases are exactly the same (from_collection
and test sink). What are the possible causes of on_timer
not being called?
Update: In a bigger program with 2 KeyedProcessFunction
, Only the 1st function on_timer
is called. The 2nd function is not.Duc Anh Khu
04/07/2023, 10:22 PMper_device_enricher
and x_device_enricher
are 2 KeyedProcessFunction
. user_id_enricher
is a CoFlatMapFunction
. on_timer
is being called on per_device_enricher
but it is not being called on x_device_enricher
.
event_src = self.env.from_collection(
event_stream,
type_info=source_event_types
)
per_device_sessionising = (
event_src
.key_by(
key_selector=src_anon_id_key_selector,
key_type=Types.STRING()
)
.process(per_device_enricher, output_type=session_event_types)
)
(
event_src
.flat_map(
DeviceUserFlatMap(),
output_type=device_mapping_event_types
)
.connect(per_device_sessionising)
.key_by(
mapping_anon_id_key_selector,
sess_anon_id_key_selector,
key_type=Types.STRING()
)
.flat_map(user_id_enricher, output_type=session_event_types)
.key_by(sess_user_id_key_selector, key_type=Types.STRING())
.process(x_device_enricher, output_type=session_event_types)
.add_sink(self.test_sink)
.name(str(uuid.uuid4()))
)
Duc Anh Khu
04/07/2023, 10:27 PMon_timer
with:
def process_element(self, event, ctx: "KeyedProcessFunction.Context"):
current_ts = ctx.timestamp()
...
ctx.timer_service().register_event_time_timer(
current_ts + 200_ms
)
Dian Fu
04/09/2023, 2:31 PMDuc Anh Khu
04/10/2023, 8:07 AMon_timer
is called on the 1st .process
(per_device_enricher
) but not on the 2nd one (x_device_enricher
).
I also try using per_device_enricher
in both places for .process
and getting the same result where on_timer
is called on only once on the 1st function. Also tried to set watermark to something very small (<200ms) and also set register_event_time_timer
to a smaller value and the result is still the same.Dian Fu
04/10/2023, 9:50 AMpython.fn-execution.bundle.size: 1
to see if it works?Duc Anh Khu
04/11/2023, 6:39 AMDian Fu
04/11/2023, 8:00 AMwait()
method which could be used to wait until the job finished.Duc Anh Khu
04/11/2023, 9:09 AMclass AllEnrichersTests(PyFlinkStreamingTestCase):
"""AllEnrichersTests"""
mock_session_generator = None
def setUp(self) -> None:
super(AllEnrichersTests, self).setUp()
self.mock_session_generator = MockedSessionGenerator()
self.test_sink = DataStreamTestSinkFunction()
# <https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/python/python_config/#configuration>
# This config is required to make application runs properly in tests
pf_config = Configuration(j_configuration=get_j_env_configuration(self.env._j_stream_execution_environment))
pf_config.set_integer("python.fn-execution.bundle.size", 1)
def tearDown(self) -> None:
self.test_sink.clear()
def enrich_events(self, event_stream):
per_device_enricher = PerDeviceSessionEnricher(config, self.mock_session_generator)
user_id_enricher = UserIdBackProp()
x_device_enricher = XDeviceSessionEnricher(config)
_, source_event_types = SOURCE_TYPE_SCHEMA.zipped()
_, session_event_types = SESSION_EVENT_SCHEMA.zipped()
_, device_mapping_event_types = DEVICE_USER_MAPPING_SCHEMA.zipped()
event_src = self.env.from_collection(
event_stream,
type_info=source_event_types
)
per_device_sessionising = (
event_src
.key_by(
key_selector=src_anon_id_key_selector,
key_type=Types.STRING()
)
.process(per_device_enricher, output_type=session_event_types)
)
user_id_enriched = (
event_src
.flat_map(
DeviceUserFlatMap(),
output_type=device_mapping_event_types
)
.connect(per_device_sessionising)
.key_by(
mapping_anon_id_key_selector,
sess_anon_id_key_selector,
key_type=Types.STRING()
)
.flat_map(user_id_enricher, output_type=session_event_types)
)
(
user_id_enriched
.key_by(sess_user_id_key_selector, key_type=Types.STRING())
.process(x_device_enricher, output_type=session_event_types)
.add_sink(self.test_sink)
.name(str(uuid.uuid4()))
)
self.env.execute()
Duc Anh Khu
04/11/2023, 1:07 PMwait()
with stream API?Duc Anh Khu
04/19/2023, 4:25 PM