When using python and flink with the sql api, I wa...
# troubleshooting
h
When using python and flink with the sql api, I want to do a filter on current timestamp, basically this:
Select count(*) from tbl where time_col >= current_timestamp - interval ‘5’ minute
But this never removes entries that no longer fit the window, instead only increasing the count when new data comes in. I guess this is because the filter is evaluated once, then left alone. Is something like this possible, or do I need to fall back to windows and possibly the datastream api?
d
if you’re ok with aggregating counts in fixed windows than you can use tumbling windows instead
hop windows allow you to slide over data with a certain step size while aggregating over a fixed length window e.g
Copy code
SELECT 
    HOP_START(time_col, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE) as window_start,
    COUNT(*)
FROM tbl
GROUP BY HOP(time_col, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE)
If you need something more precise than this, yes you’re right that with DataStream API you can tailor it exactly as needed.
h
Hmm, that’s a shame… Tbh it does feel a little like a bug, because clearly the “current_timestamp” filter is not applied properly during the group by. I would guess jf you showed this query to anyone not knee-deep within flink, they’d guess the wrong behavior here. State management to this wouldn’t be trickier than with a window, right? Another reason this bums me out is that this would have been a great way to limit the state for some of our use cases (where we now have to rely on state ttl)
d
I assume your source is Kafka right?
h
It is, yeah
d
Just create your own window using DataStream API something like this. I will just show from csv but you can get from Kafka.
Copy code
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.window import Tumble
from pyflink.table.expressions import lit, col

env = ExecutionEnvironment.get_execution_environment()
t_config = TableConfig()
t_env = BatchTableEnvironment.create(env, t_config)

# assuming you read from a CSV file or another source
t_env.connect(FileSystem().path("path_to_your_csv"))
    .with_format(OldCsv()
                 .field("time_col", DataTypes.TIMESTAMP(3))
                 # Add other fields as needed
                 )
    .with_schema(Schema()
                 .field("time_col", DataTypes.TIMESTAMP(3))
                 # Define other fields
                 )
    .register_table_source("mySource")

t_env.create_temporary_view("myTable", t_env.from_path("mySource"))

# define a sliding window of 1 second every 1 second, and filter for last 5 minutes
result = t_env.sql_query("""
    SELECT 
        TUMBLE_START(time_col, INTERVAL '1' SECOND) as window_start,
        COUNT(*)
    FROM myTable
    WHERE time_col >= NOW() - INTERVAL '5' MINUTE
    GROUP BY TUMBLE(time_col, INTERVAL '1' SECOND)
""")

t_env.to_append_stream(result, schema).print()

t_env.execute("SlidingWindowCount")
This is just an approximation but shows the basic approach.
If someone knows a way around creating a custom window with DataStreams API that would be even better! Because this is also going to need a lot of testing too!
h
Thanks! In my case there’s some complications making the approach harder than standard windows can easily accommodate. Basically different events have different expiry times, based on some calculation. This means the field I would need to window on is not the watermark or time field… I have found a workaround that somewhat works, and it’s using a subquery with a max(kafka_time) instead of the current_timestamp. This works, but produces weird retractions that are hard to get rid of… I think a kpf is my only option here, unfortunately.
d
yes, that is a special case.