Eric Liu
10/03/2023, 9:31 PMEric Liu
10/03/2023, 9:32 PMDataStream<OfferUsageAggregatedHourly> hourlyAggregationStream = timestampedStream
.keyBy(event -> event.getOfferId())
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.allowedLateness(Time.days(config.allowedLatenessDays))
.trigger(new TumblingWindowIncTrigger(config.triggerIntervalSeconds))
.aggregate(new HourlyAggregationFunction(), new BucketAssigner()).uid("Hourly Aggregation")
.keyBy(event -> event.getOfferId());
Total aggregation:
DataStream<OfferUsageAggregatedTotal> totalAggregationStream = timestampedStream
.keyBy(event -> event.getOfferId())
.window(GlobalWindows.create())
.trigger(new GlobalWindowIncTrigger(config.triggerIntervalSeconds))
.aggregate(new TotalAggregationFunction()).uid("Total Aggregation")
.keyBy(event -> event.getOfferId());
Eric Liu
10/03/2023, 9:34 PMpublic class TumblingWindowIncTrigger extends Trigger<ChangeEventDiff, TimeWindow> {
Integer updateInterval;
public TumblingWindowIncTrigger(Integer updateInterval) {
this.updateInterval = updateInterval;
}
@Override
public TriggerResult onElement(ChangeEventDiff element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
long now = ctx.getCurrentProcessingTime();
ctx.registerProcessingTimeTimer(now + updateInterval);
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long ts, TimeWindow w, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
}
}
Eric Liu
10/03/2023, 9:34 PMpublic class GlobalWindowIncTrigger extends Trigger<ChangeEventDiff, GlobalWindow> {
Integer updateInterval;
public GlobalWindowIncTrigger(Integer updateInterval) {
this.updateInterval = updateInterval;
}
@Override
public TriggerResult onElement(ChangeEventDiff element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
long now = ctx.getCurrentProcessingTime();
ctx.registerProcessingTimeTimer(now + updateInterval);
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long ts, GlobalWindow w, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE;
}
@Override
public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {
}
}
Vineeth Naroju
10/05/2023, 11:39 PM