Hi All, Is there a way to add a fixed window end t...
# troubleshooting
m
Hi All, Is there a way to add a fixed window end time for the session window? For example, after the session window lasts 48 hours, force close it.
d
The built-in session window doesn't have a timeout, but you could implement a custom trigger to make this happen.
m
Thank you for your advice! Your approach has been really helpful to me. However, I'm facing some challenges in actually writing the code. I write a custom trigger based on ContinuousSessionTrigger, I add
Time timeout
as session window end timeout. But I'm not sure how to use it. Should I add a timeout state and register a timeout timer at beginning of session window in onElement function, and add a condition check in onEventTime function, if 48 hours have passed, onEventTime return TriggerResult.FIRE ? Here is my code:
Copy code
package Test.WindowAssigner;

import java.text.SimpleDateFormat;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;

/**
 * A {@link Trigger} that continuously fires based on a given time interval.
 * This fires based on
 * {@link org.apache.flink.streaming.api.watermark.Watermark Watermarks}.
 *
 * @see org.apache.flink.streaming.api.watermark.Watermark
 * @param <W> The type of {@link Window Windows} on which this trigger can
 *            operate.
 */
@PublicEvolving
public class ContinuousSessionTrigger<W extends Window> extends Trigger<Object, W> {
    private static final long serialVersionUID = 1L;

    private final long interval;
    private final long timeout;

    /**
     * When merging we take the lowest of all fire timestamps as the new fire
     * timestamp.
     */
    private final ReducingStateDescriptor<Long> stateDesc = new ReducingStateDescriptor<>("fire-time", new Min(),
            LongSerializer.INSTANCE);

    private ContinuousSessionTrigger(long interval, long timeout) {
        this.interval = interval;
        this.timeout = timeout;
    }

    @Override
    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx)
            throws Exception {

        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
        }

        ReducingState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc);
        if (fireTimestampState.get() == null) {
            registerNextFireTimestamp(
                    timestamp - (timestamp % interval), window, ctx, fireTimestampState);
        }

        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {

        if (time == window.maxTimestamp()) {
            return TriggerResult.FIRE;
        }

        ReducingState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc);

        Long fireTimestamp = fireTimestampState.get();

        if (fireTimestamp != null && fireTimestamp == time) {
            fireTimestampState.clear();
            registerNextFireTimestamp(time, window, ctx, fireTimestampState);
            return TriggerResult.FIRE;
        }

        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)
            throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(W window, TriggerContext ctx) throws Exception {

        ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
        Long timestamp = fireTimestamp.get();
        if (timestamp != null) {
            ctx.deleteEventTimeTimer(timestamp);
            fireTimestamp.clear();
        }
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(W window, OnMergeContext ctx) throws Exception {
        ctx.mergePartitionedState(stateDesc);
        Long nextFireTimestamp = ctx.getPartitionedState(stateDesc).get();
        if (nextFireTimestamp != null) {
            ctx.registerEventTimeTimer(nextFireTimestamp);
        }
    }

    @Override
    public String toString() {
        return "ContinuousSessionTrigger(" + interval + ")";
    }

    @VisibleForTesting
    public long getInterval() {
        return interval;
    }

    /**
     * Creates a trigger that continuously fires based on the given interval.
     *
     * @param interval The time interval at which to fire.
     * @param <W>      The type of {@link Window Windows} on which this trigger can
     *                 operate.
     */
    public static <W extends Window> ContinuousSessionTrigger<W> of(Time interval, Time timeout) {
        return new ContinuousSessionTrigger<>(interval.toMilliseconds(), timeout.toMilliseconds());
    }

    private static class Min implements ReduceFunction<Long> {
        private static final long serialVersionUID = 1L;

        @Override
        public Long reduce(Long value1, Long value2) throws Exception {
            return Math.min(value1, value2);
        }
    }

    private void registerNextFireTimestamp(
            long time, W window, TriggerContext ctx, ReducingState<Long> fireTimestampState)
            throws Exception {
        long nextFireTimestamp = Math.min(time + interval, window.maxTimestamp());

        fireTimestampState.add(nextFireTimestamp);
        ctx.registerEventTimeTimer(nextFireTimestamp);
    }
}