milk
05/22/2023, 8:24 AMDavid Anderson
05/22/2023, 9:47 AMmilk
05/22/2023, 10:43 AMTime timeout
as session window end timeout. But I'm not sure how to use it.
Should I add a timeout state and register a timeout timer at beginning of session window in onElement function, and add a condition check in onEventTime function, if 48 hours have passed, onEventTime return TriggerResult.FIRE ?
Here is my code:milk
05/22/2023, 10:44 AMpackage Test.WindowAssigner;
import java.text.SimpleDateFormat;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
/**
* A {@link Trigger} that continuously fires based on a given time interval.
* This fires based on
* {@link org.apache.flink.streaming.api.watermark.Watermark Watermarks}.
*
* @see org.apache.flink.streaming.api.watermark.Watermark
* @param <W> The type of {@link Window Windows} on which this trigger can
* operate.
*/
@PublicEvolving
public class ContinuousSessionTrigger<W extends Window> extends Trigger<Object, W> {
private static final long serialVersionUID = 1L;
private final long interval;
private final long timeout;
/**
* When merging we take the lowest of all fire timestamps as the new fire
* timestamp.
*/
private final ReducingStateDescriptor<Long> stateDesc = new ReducingStateDescriptor<>("fire-time", new Min(),
LongSerializer.INSTANCE);
private ContinuousSessionTrigger(long interval, long timeout) {
this.interval = interval;
this.timeout = timeout;
}
@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx)
throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
}
ReducingState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc);
if (fireTimestampState.get() == null) {
registerNextFireTimestamp(
timestamp - (timestamp % interval), window, ctx, fireTimestampState);
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
if (time == window.maxTimestamp()) {
return TriggerResult.FIRE;
}
ReducingState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc);
Long fireTimestamp = fireTimestampState.get();
if (fireTimestamp != null && fireTimestamp == time) {
fireTimestampState.clear();
registerNextFireTimestamp(time, window, ctx, fireTimestampState);
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)
throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
Long timestamp = fireTimestamp.get();
if (timestamp != null) {
ctx.deleteEventTimeTimer(timestamp);
fireTimestamp.clear();
}
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void onMerge(W window, OnMergeContext ctx) throws Exception {
ctx.mergePartitionedState(stateDesc);
Long nextFireTimestamp = ctx.getPartitionedState(stateDesc).get();
if (nextFireTimestamp != null) {
ctx.registerEventTimeTimer(nextFireTimestamp);
}
}
@Override
public String toString() {
return "ContinuousSessionTrigger(" + interval + ")";
}
@VisibleForTesting
public long getInterval() {
return interval;
}
/**
* Creates a trigger that continuously fires based on the given interval.
*
* @param interval The time interval at which to fire.
* @param <W> The type of {@link Window Windows} on which this trigger can
* operate.
*/
public static <W extends Window> ContinuousSessionTrigger<W> of(Time interval, Time timeout) {
return new ContinuousSessionTrigger<>(interval.toMilliseconds(), timeout.toMilliseconds());
}
private static class Min implements ReduceFunction<Long> {
private static final long serialVersionUID = 1L;
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return Math.min(value1, value2);
}
}
private void registerNextFireTimestamp(
long time, W window, TriggerContext ctx, ReducingState<Long> fireTimestampState)
throws Exception {
long nextFireTimestamp = Math.min(time + interval, window.maxTimestamp());
fireTimestampState.add(nextFireTimestamp);
ctx.registerEventTimeTimer(nextFireTimestamp);
}
}