Hello Team, I am developing Flink Java Application...
# troubleshooting
a
Hello Team, I am developing Flink Java Application where I have requirement to consume events form kafkaSource and do some processing then Sink to MongoDb and from same MongoDB source/Fetch the same collection and display in console fetch data, how can I create this flow, as I can source and sink but when I call env.execute it start the flink job and stuck with the last one, like I need this flow should work on arrival of each event, need help please suggest me some solution code example is : public class CarStreams_SinkSource implements Serializable { public static final String SECURITY_PROTOCOL_PROPERTY = "SASL_SSL"; public static final String SECURITY_MECHANISM_PROPERTY = "PLAIN"; public static final String SECURITY_JAAS_CONFIG_PROPERTY = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://a-test-eventhub-eastus.servicebus.windows.net/;SharedAccessKeyName=L-PP;SharedAccessKey=Cre+rthtrh55rgegagg+6rtgrt+dgrdg/RY=\";"; public static final String CONSUMER_TIMEOUT_PROPERTY = "300000"; private static final String CAR_EVENT_AA_SCHEDULE = "car-event-schedule"; private static final String BOOTSTRAP_SERVERS = "a-test-eventhub-eastus.servicebus.windows.net:9093"; private static final boolean LOCAL_TESTING = false; public CarEventProcessor carEventProcessor; private final String CAR_COLLECTION_NAME ="A_CarSinkSourceTest"; ObjectMapper mapper = new ObjectMapper(); private static final String URI = "mongodb+srv://car:gdfgd8fex@pp-np-eastus-mdb.xgpgt.mongodb.net/?authSource=admin&retryWrites=true&w=majority"; private static final String MONGO_DATABASE = "pp-car"; public CarStreams_SinkSource() { carEventProcessor = new CarEventProcessor(); } public static void main(String[] args) { CarStreams_SinkSource carStreams = new CarStreams_SinkSource(); carStreams.initStreams(); } public void initStreams() { mapper.configure(MapperFeature.INFER_PROPERTY_MUTATORS, false); mapper.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true); try { System.setProperty("org.apache.flink.java.opts", "--add-opens java.base/java.lang=ALL-UNNAMED"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); env.setParallelism(1); env.enableCheckpointing(5000); // setting the checkpoint env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); DataStreamString rawCarEventStream; KafkaSourceString aaOhTestEventhubCarEastus = KafkaSource.Stringbuilder() .setBootstrapServers(BOOTSTRAP_SERVERS) .setTopics(CAR_EVENT_AA_SCHEDULE, CAR_EVENT_AA_TIME, CAR_EVENT_AA_AIRCRAFT, CAR_EVENT_AA_MISC, CAR_EVENT_AA_DEPARTURE_ARRIVAL, CAR_EVENT_AA_LOAD, CAR_EVENT_AA_FUEL) .setGroupId("pp") .setStartingOffsets(OffsetsInitializer.earliest()) .setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SECURITY_PROTOCOL_PROPERTY) .setProperty(SaslConfigs.SASL_MECHANISM, SECURITY_MECHANISM_PROPERTY) .setProperty(SaslConfigs.SASL_JAAS_CONFIG, SECURITY_JAAS_CONFIG_PROPERTY) .setProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, CONSUMER_TIMEOUT_PROPERTY) .setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()) .setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); rawCarEventStream = env.fromSource(aaOhTestEventhubCarEastus, WatermarkStrategy.noWatermarks(), "aa-oh-test-eventhub-car") .name("carProcessor-rawCarEventStream"); DataStreamCarRawEvent carRawEventStream = rawCarEventStream.map((MapFunctionString, CarRawEvent) value -> { CarRawEvent carRawEvent = mapper.readValue(value, CarRawEvent.class); createCarKey(carRawEvent); return carRawEvent; }).name("car-rawEvent-map").keyBy(CarRawEvent::getCarKey); processCarEvents(carRawEventStream, env); } catch (Exception e) { log.error("Failed with exception " + e); e.printStackTrace(); } log.info("Exit CarStreams::processCarRawEvents"); } void processCarEvents(DataStreamCarRawEvent carRawEventStream, StreamExecutionEnvironment env) throws Exception { DataStreamCar carStream = carEventProcessor.processCarEvents(carRawEventStream).keyBy(Car::getCarKey); carStream.sinkTo(sinkToMongoDB(CAR_COLLECTION_NAME, Car.class)).name("car-mongoSink"); env.execute("CarProcessor Job"); ListCar cars = getCarFromMongoDB(); System.out.println("cars=>"+cars.size()); env.execute("CarProcessor Job"); } public Listcom.flink.datamodel.Car getCarFromMongoDB() throws Exception { Listcom.flink.datamodel.Car carList = new ArrayList<>(); MongoSourceString source = createMongoSource(CAR_COLLECTION_NAME); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); env.setParallelism(1); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(3, org.apache.flink.api.common.time.Time.seconds(10))); DataStreamString carDataStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MongoDB-car-source"); carDataStream.print("after fetch:"); DataStreamcom.flink.datamodel.Car carStream = carDataStream.map((MapFunctionString, com.flink.datamodel.Car) value -> { com.flink.datamodel.Car filteredcar = mapper.readValue(value, com.flink.datamodel.Car.class); return filteredcar; }).name("getCarFromMongoDB-filtered-map").keyBy(com.flink.datamodel.Car::getCarKey); env.execute("Read from MongoDB-getCarFromMongoDB"); IteratorCar myOutput = DataStreamUtils.collect(carStream); while (myOutput.hasNext()) carList.add(myOutput.next()); return carList; } public MongoSourceString createMongoSource(String collectionName){ MongoSourceString source = MongoSource.Stringbuilder() .setUri(URI) .setDatabase(MONGO_DATABASE) .setCollection(collectionName) .setNoCursorTimeout(true) .setPartitionStrategy(PartitionStrategy.SAMPLE) .setPartitionSize(MemorySize.ofMebiBytes(64)) .setSamplesPerPartition(10) .setDeserializationSchema(new MongoDeserializationSchemaString() { @Override public String deserialize(BsonDocument document) { return document.toJson(); } @Override public TypeInformationString getProducedType() { return BasicTypeInfo.STRING_TYPE_INFO; } }) .build(); return source; } }