Hi, I have a problem with STATE_TTL hint with DIST...
# troubleshooting
m
Hi, I have a problem with STATE_TTL hint with DISTINCT. I have the following query:
Copy code
SELECT /*+ STATE_TTL('joined'='1s') */ *
FROM t1
INNER JOIN t2 FOR SYSTEM_TIME AS OF t1.proc_time AS joined ON t1.id = t2.id;
but when I try to add
DISTINCT *
in the select I suddenly get:
Copy code
org.apache.flink.table.api.ValidationException: The options of following hints cannot match the name of input tables or views: 
`joined` in `STATE_TTL`
any idea why?
d
In your first query, you apply the STATE_TTL hint to the join result alias joined. This works because joined effectively represents a temporary view of the joined data within the scope of the query.
Once you use DISTINCT * into your SELECT clause, the semantics change. DISTINCT operates after the join and projects a set of unique rows from the result set. It doesn’t correspond directly to an input table or a named view in the same way t1 and joined do. Instead, it’s an operation applied on the output of the join.
The error message indicates that joined is no longer a valid reference for the STATE_TTL hint once you’ve introduced DISTINCT *, because DISTINCT operation creates a new logical “table” that is not named and thus cannot be referenced directly by the STATE_TTL hint.
m
so there is no why to control the ttl of distinct queries? The state will accumulate endlessly?
d
well you could restructure your data processing pipeline a bit using a temporary table/view
You could perform the join as you are doing
then insert the joined data into temporary table where you can apply the district operation.
You could I think apply the distinct in subsequent step or maybe during temporary table creation
Set the TTL on the new temporary table if possible.
The other options might involve using windowing
I think you might end up needing to use Flink Table API for more control
Next level down is DataStream API. Handling distinct records with TTL in streaming context is a complex task and needs to be tested out.
m
So in theory I define temporary table
x
with ttl 1 hour. Then I select distinct from that table, the distinct will always reflect the state of
x
, if data stops arriving for an hour and the state is expired, both tables will be empty
d
yes, you might need to separate out data expiration and distinct records maintenance concerns.
In this case you would probably also maintain a district records table that would be periodically updated or you maintain this data in Kafka.
m
I sink the data out into a DB, I guess the distinct will emit changelog stream with delete records but I'll just ignore them
d
yes that works too
m
Thanks for all the help 🙌
d
Sure, good luck on it!