Gaurav Gupta
06/08/2023, 6:22 PMpackage org.example;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.List;
import java.util.Map;
public class FlinkCEPExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Event> events = env.fromElements(
new Event(1, "start"),
new Event(2, "middle"),
new Event(3, "end")
);
Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
.where(SimpleCondition.of(event -> event.getName().equals("start")))
.followedBy("middle")
.where(SimpleCondition.of(event -> event.getName().equals("middle")))
.followedBy("end")
.where(SimpleCondition.of(event -> event.getName().equals("end")));
DataStream<String> output = CEP.pattern(events, pattern)
.select(new PatternSelectFunction<Event, String>() {
@Override
public String select(Map<String, List<Event>> pattern) throws Exception {
StringBuilder result = new StringBuilder();
for (Event event : pattern.get("start")) {
result.append(event.getId()).append("-");
}
result.append(pattern.get("middle").get(0).getId()).append("-");
result.append(pattern.get("end").get(0).getId());
return result.toString();
}
});
output.print();
// Execute the job
env.execute("Flink CEP Pattern Example");
}
}
Gaurav Gupta
06/08/2023, 6:24 PMGaurav Gupta
06/08/2023, 9:16 PM