Hi team, I have a Flink job that contains two aggr...
# troubleshooting
e
Hi team, I have a Flink job that contains two aggregations, one is using tumbling hourly windows, and the other one is using global windows. I’m sinking the output of hourly & total aggregation into two separate Kafka topics. The two aggregations use the same source and transformation logic except the aggregation piece. And they both use processing time triggers (every 5s) . I was expecting two sink topics should receive the same amount of messages because the two aggregations consume the data stream and use the same trigger interval, however, I noticed that the number of messages for the total aggregation is less than the hourly aggregation, and almost half of it, which is quite counter-intuitive. Does anyone know whether this is expected, and if it’s not, what could be the cause of the issue?
Hourly aggregation:
Copy code
DataStream<OfferUsageAggregatedHourly> hourlyAggregationStream = timestampedStream
            .keyBy(event -> event.getOfferId())
            .window(TumblingEventTimeWindows.of(Time.hours(1)))
            .allowedLateness(Time.days(config.allowedLatenessDays))
            .trigger(new TumblingWindowIncTrigger(config.triggerIntervalSeconds))
            .aggregate(new HourlyAggregationFunction(), new BucketAssigner()).uid("Hourly Aggregation")
            .keyBy(event -> event.getOfferId());
Total aggregation:
Copy code
DataStream<OfferUsageAggregatedTotal> totalAggregationStream = timestampedStream
            .keyBy(event -> event.getOfferId())
            .window(GlobalWindows.create())
            .trigger(new GlobalWindowIncTrigger(config.triggerIntervalSeconds))
            .aggregate(new TotalAggregationFunction()).uid("Total Aggregation")
            .keyBy(event -> event.getOfferId());
Copy code
public class TumblingWindowIncTrigger extends Trigger<ChangeEventDiff, TimeWindow> {

    Integer updateInterval;

    public TumblingWindowIncTrigger(Integer updateInterval) {
        this.updateInterval = updateInterval;
    }
    @Override
    public TriggerResult onElement(ChangeEventDiff element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {

        long now = ctx.getCurrentProcessingTime();
        ctx.registerProcessingTimeTimer(now + updateInterval);

        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long ts, TimeWindow w, TriggerContext ctx) throws Exception {
        return TriggerResult.FIRE;
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
    }


}
Copy code
public class GlobalWindowIncTrigger extends Trigger<ChangeEventDiff, GlobalWindow> {

    Integer updateInterval;

    public GlobalWindowIncTrigger(Integer updateInterval) {
        this.updateInterval = updateInterval;
    }
    @Override
    public TriggerResult onElement(ChangeEventDiff element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {

        long now = ctx.getCurrentProcessingTime();
        ctx.registerProcessingTimeTimer(now + updateInterval);

        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long ts, GlobalWindow w, TriggerContext ctx) throws Exception {
        return TriggerResult.FIRE;
    }

    @Override
    public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {
    }
}
v
Just for preliminary, you can check in job manager UI, the number of records in / out for each operator. That might tell the difference in both the job graphs, which probably can help to identify how often the bucketing happens.