Hello Everyone, I am trying to explore flink-cep l...
# troubleshooting
g
Hello Everyone, I am trying to explore flink-cep library and trying to run a sample job to start, But its not producing any output. Please let me know if i am doing something wrong.
Copy code
package org.example;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.List;
import java.util.Map;

public class FlinkCEPExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Event> events = env.fromElements(
                new Event(1, "start"),
                new Event(2, "middle"),
                new Event(3, "end")
        );

        Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
                .where(SimpleCondition.of(event -> event.getName().equals("start")))
                .followedBy("middle")
                .where(SimpleCondition.of(event -> event.getName().equals("middle")))
                .followedBy("end")
                .where(SimpleCondition.of(event -> event.getName().equals("end")));

        DataStream<String> output = CEP.pattern(events, pattern)
                .select(new PatternSelectFunction<Event, String>() {
                    @Override
                    public String select(Map<String, List<Event>> pattern) throws Exception {
                        StringBuilder result = new StringBuilder();
                        for (Event event : pattern.get("start")) {
                            result.append(event.getId()).append("-");
                        }
                        result.append(pattern.get("middle").get(0).getId()).append("-");
                        result.append(pattern.get("end").get(0).getId());
                        return result.toString();
                    }
                });
        output.print();
        // Execute the job
        env.execute("Flink CEP Pattern Example");
    }
}
the job is running successful, but not producing any output. Also intelliJ IDEA also not helping, its not going inside patternSelect function at all
Also same worked with 1.11.6 version of libraries but not above of that