Hello Team, I am developing Flink Java Application where I have requirement to consume events form kafkaSource and do some processing then Sink to MongoDb and from same MongoDB source/Fetch the same collection and display in console fetch data, how can I create this flow, as I can source and sink but when I call env.execute it start the flink job and stuck with the last one, like I need this flow should work on arrival of each event, need help please suggest me some solution code example is :
public class CarStreams_SinkSource implements Serializable {
public static final String SECURITY_PROTOCOL_PROPERTY = "SASL_SSL";
public static final String SECURITY_MECHANISM_PROPERTY = "PLAIN";
public static final String SECURITY_JAAS_CONFIG_PROPERTY = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=
sb://a-test-eventhub-eastus.servicebus.windows.net/;SharedAccessKeyName=L-PP;SharedAccessKey=Cre+rthtrh55rgegagg+6rtgrt+dgrdg/RY=\";";
public static final String CONSUMER_TIMEOUT_PROPERTY = "300000";
private static final String CAR_EVENT_AA_SCHEDULE = "car-event-schedule";
private static final String BOOTSTRAP_SERVERS = "a-test-eventhub-eastus.servicebus.windows.net:9093";
private static final boolean LOCAL_TESTING = false;
public CarEventProcessor carEventProcessor;
private final String CAR_COLLECTION_NAME ="A_CarSinkSourceTest";
ObjectMapper mapper = new ObjectMapper();
private static final String URI = "
mongodb+srv://car:gdfgd8fex@pp-np-eastus-mdb.xgpgt.mongodb.net/?authSource=admin&retryWrites=true&w=majority";
private static final String MONGO_DATABASE = "pp-car";
public CarStreams_SinkSource() {
carEventProcessor = new CarEventProcessor();
}
public static void main(String[] args) {
CarStreams_SinkSource carStreams = new CarStreams_SinkSource();
carStreams.initStreams();
}
public void initStreams() {
mapper.configure(MapperFeature.INFER_PROPERTY_MUTATORS, false);
mapper.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true);
try {
System.setProperty("org.apache.flink.java.opts", "--add-opens java.base/java.lang=ALL-UNNAMED");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);
env.enableCheckpointing(5000); // setting the checkpoint
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
DataStream
String rawCarEventStream;
KafkaSource
String aaOhTestEventhubCarEastus = KafkaSource.
Stringbuilder()
.setBootstrapServers(BOOTSTRAP_SERVERS)
.setTopics(CAR_EVENT_AA_SCHEDULE, CAR_EVENT_AA_TIME, CAR_EVENT_AA_AIRCRAFT, CAR_EVENT_AA_MISC,
CAR_EVENT_AA_DEPARTURE_ARRIVAL, CAR_EVENT_AA_LOAD, CAR_EVENT_AA_FUEL)
.setGroupId("pp")
.setStartingOffsets(OffsetsInitializer.earliest())
.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SECURITY_PROTOCOL_PROPERTY)
.setProperty(SaslConfigs.SASL_MECHANISM, SECURITY_MECHANISM_PROPERTY)
.setProperty(SaslConfigs.SASL_JAAS_CONFIG, SECURITY_JAAS_CONFIG_PROPERTY)
.setProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, CONSUMER_TIMEOUT_PROPERTY)
.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName())
.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
rawCarEventStream = env.fromSource(aaOhTestEventhubCarEastus,
WatermarkStrategy.noWatermarks(), "aa-oh-test-eventhub-car")
.name("carProcessor-rawCarEventStream");
DataStream
CarRawEvent carRawEventStream = rawCarEventStream.map((MapFunction
String, CarRawEvent) value -> {
CarRawEvent carRawEvent = mapper.readValue(value, CarRawEvent.class);
createCarKey(carRawEvent);
return carRawEvent;
}).name("car-rawEvent-map").keyBy(CarRawEvent::getCarKey);
processCarEvents(carRawEventStream, env);
} catch (Exception e) {
log.error("Failed with exception " + e);
e.printStackTrace();
}
log.info("Exit CarStreams::processCarRawEvents");
}
void processCarEvents(DataStream
CarRawEvent carRawEventStream, StreamExecutionEnvironment env) throws Exception {
DataStream
Car carStream = carEventProcessor.processCarEvents(carRawEventStream).keyBy(Car::getCarKey);
carStream.sinkTo(sinkToMongoDB(CAR_COLLECTION_NAME, Car.class)).name("car-mongoSink");
env.execute("CarProcessor Job");
List
Car cars = getCarFromMongoDB();
System.out.println("cars=>"+cars.size());
env.execute("CarProcessor Job");
}
public List
com.flink.datamodel.Car getCarFromMongoDB() throws Exception {
List
com.flink.datamodel.Car carList = new ArrayList<>();
MongoSource
String source = createMongoSource(CAR_COLLECTION_NAME);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(3, org.apache.flink.api.common.time.Time.seconds(10)));
DataStream
String carDataStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MongoDB-car-source");
carDataStream.print("after fetch:");
DataStream
com.flink.datamodel.Car carStream = carDataStream.map((MapFunction
String, com.flink.datamodel.Car) value -> {
com.flink.datamodel.Car filteredcar = mapper.readValue(value, com.flink.datamodel.Car.class);
return filteredcar;
}).name("getCarFromMongoDB-filtered-map").keyBy(com.flink.datamodel.Car::getCarKey);
env.execute("Read from MongoDB-getCarFromMongoDB");
Iterator
Car myOutput = DataStreamUtils.collect(carStream);
while (myOutput.hasNext())
carList.add(myOutput.next());
return carList;
}
public MongoSource
String createMongoSource(String collectionName){
MongoSource
String source = MongoSource.
Stringbuilder()
.setUri(URI)
.setDatabase(MONGO_DATABASE)
.setCollection(collectionName)
.setNoCursorTimeout(true)
.setPartitionStrategy(PartitionStrategy.SAMPLE)
.setPartitionSize(MemorySize.ofMebiBytes(64))
.setSamplesPerPartition(10)
.setDeserializationSchema(new MongoDeserializationSchema
String() {
@Override
public String deserialize(BsonDocument document) {
return document.toJson();
}
@Override
public TypeInformation
String getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
})
.build();
return source;
}
}