han liu
09/11/2023, 3:01 PMRion Williams
09/11/2023, 3:04 PMconnect()
function based on constructing a stream from the elements you read from the database:
// Get your names
val names = getNamesFromDatabase()
// Create a stream of your names
val nameStream = streamEnv.fromElements(names)
// Join your existing source with your names and do something
streamEnv
.fromDataSource(...)
.connect(nameStream)
.process(DoSomethingFunction())
There are lots of ways to accomplish this depending on how you want to construct your job, timing, etc. but this is a very naive way to join two separate streams.han liu
09/11/2023, 3:17 PMOkay, thank you for your suggestion, I'll give it a try and let you know if it works.