Hi, I am trying to use CEP with DataStream, now th...
# random
k
Hi, I am trying to use CEP with DataStream, now the bellow code works fine for Processing time but it is not producing any result with event time. I guess there is some issue with Watermarking can any one help what is that I am doing wrong. Also how can I handle out of order events? Event.java
Copy code
package org.example.examples;

import lombok.*;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.TypeExtractor;

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode
@ToString
public class Event {
    private int id;
    private String name;
    private double price;
    private long timestamp;

    public static TypeSerializer<Event> createTypeSerializer() {
        TypeInformation<Event> typeInformation = TypeExtractor.createTypeInfo(Event.class);

        return typeInformation.createSerializer(new ExecutionConfig());
    }
}
PatternMatchDataStream.java
Copy code
public class PatternMatchDataStream {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.getConfig().setAutoWatermarkInterval(10000);

        KafkaSource<Event> logSource = KafkaSource.<Event>builder()
                .setTopics("log")
                .setBootstrapServers("localhost:9092")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new LogDeserializer())
                .build();

        DataStream<Event> input =
                executionEnvironment.fromSource(
                        logSource,
                        WatermarkStrategy.noWatermarks(),
                        "Kafka Source"
                ).assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                                .withTimestampAssigner((event, ts) -> event.getTimestamp())
                );

        Pattern<Event, ?> pattern =
                Pattern.<Event>begin("start")
                        .where(SimpleCondition.of(value -> value.getName().equals("one")));

        DataStream<String> result =
                CEP.pattern(input, pattern)
                        .inEventTime()
                        .process(new PatternProcessFunction<Event, String>() {
                            @Override
                            public void processMatch(Map<String, List<Event>> match, Context ctx, Collector<String> out) throws Exception {
                                System.out.println(match);
                                // collect the result
                            }
                        });

        input.print();
        result.print();

        executionEnvironment.execute("DataStream Pattern Match");
    }
}