Hello, I have a quick question on TimestampAssigne...
# random
j
Hello, I have a quick question on TimestampAssigner. The document says
Specifying a
TimestampAssigner
is optional and in most cases you don’t actually want to specify one. For example, when using Kafka or Kinesis you would get timestamps directly from the Kafka/Kinesis records.
Does the timestamps from Kafka records indicate the timestamps that records are logged (created)? What if an IoT device gets offline and fails to send messages for a while. I guess it is safer to assign timestamps from record values if existing?
o
Actually, this depends on how timestamps are set when sending the records to Kafka. One option is to send the records with as little information as possible, which means that the KafkaProducer will add a timestamp of “now”. However, you can also extract the timestamp from the record, and add that as timestamp in the
ProducerRecord
.
1
j
I'm reading a book and it has an example that stresses the importance of event time. A mobile game user plays a game and records are not sent when the phone gets offline (eg while going through a tunnel). Rather the messages are buffered within the device. Once it gets back online, the messages are sent to Kafka but the record timestamp of messages are lagged. Also I had a couple of examples where the Debezium connector went wrong and it took several hours or even days until it returned back online. The connector reports two timestamps (ts_ms) - record timestamp and source timestamp. The former is when the connector processed the event while the latter is when the database transaction was made. I guess what the document indicates is the former and I'm not sure if it is reliable in this situation.
r
event time vs processed time is a critical data point before building the system. the problem with event time is that it may not be standard across all your devices or users as they may be spread across different timezones. • if for your use case event time is important, then it might be a good option, if not in general building the system based on processed time might be a better approach but it is very dependent on your use case based volume, scale and other factors.
o
Your mileage may vary, but using process time is usually not what you want. The reason is that when replaying data, using process time usually yields different (and wrong) results with aggregations. Using (and storing) ingestion time is then a better choice if event time is not available.
💯 1
r
+1, this is a really good point. ^^ understand the different times available with flink .. there are a few of them event time, ingestion time, processed time. • https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/concepts/time_attributes/https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/concepts/time/ ^^ something to read through.
👍 1
j
Actually my question is whether specifying TimestampAssigner would be better because Kafka/Kinesis record timestamps can be lagged (eg due to network outage) although the document indicates it is optional.
Copy code
WatermarkStrategy
        .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withTimestampAssigner((event, timestamp) -> event.f0);
1
r
It is a bit of a complex topic, I am not sure if you have see this video, but it breaks it down pretty well:

https://youtu.be/sdhwpUAjqaI?si=jgTpMb-1FmnywDix

👍 1
Afaik, Default is no water mark if the partition is empty or idle, or 5m for each partition (not across all partitions). If ur system generates an out of order event beyond 5m, flink sql will ignore by default whereas datastream api gives you a bit more control.
j
Thanks @RootedLabs! Let me take a look!