Krutik Gadhiya
09/13/2023, 4:50 AMpackage org.example.examples;
import lombok.*;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.TypeExtractor;
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode
@ToString
public class Event {
private int id;
private String name;
private double price;
private long timestamp;
public static TypeSerializer<Event> createTypeSerializer() {
TypeInformation<Event> typeInformation = TypeExtractor.createTypeInfo(Event.class);
return typeInformation.createSerializer(new ExecutionConfig());
}
}
PatternMatchDataStream.java
public class PatternMatchDataStream {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.getConfig().setAutoWatermarkInterval(10000);
KafkaSource<Event> logSource = KafkaSource.<Event>builder()
.setTopics("log")
.setBootstrapServers("localhost:9092")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new LogDeserializer())
.build();
DataStream<Event> input =
executionEnvironment.fromSource(
logSource,
WatermarkStrategy.noWatermarks(),
"Kafka Source"
).assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((event, ts) -> event.getTimestamp())
);
Pattern<Event, ?> pattern =
Pattern.<Event>begin("start")
.where(SimpleCondition.of(value -> value.getName().equals("one")));
DataStream<String> result =
CEP.pattern(input, pattern)
.inEventTime()
.process(new PatternProcessFunction<Event, String>() {
@Override
public void processMatch(Map<String, List<Event>> match, Context ctx, Collector<String> out) throws Exception {
System.out.println(match);
// collect the result
}
});
input.print();
result.print();
executionEnvironment.execute("DataStream Pattern Match");
}
}