Oscar Perez
06/09/2023, 3:14 PMval accountChangedStream = accountDataStreamFactory.get().keyBy { it.accountUserId }
val accountActivityStream = accountActivityDataStreamFactory.get().keyBy { it.userId }
val csvStream = factory.get().keyBy { it.userId }.process(ReadFirstTimeProcessor())
val joinedStream = accountActivityStream.union(csvStream)
val clusterChanges: SingleOutputStreamOperator<CustomerClusterEvent> =
accountChangedStream
.connect(joinedStream)
.process(ActivityClusteringProcessor())
.name(dataStreamName)
Oscar Perez
06/09/2023, 3:15 PMOscar Perez
06/09/2023, 3:15 PMVitor Leal
06/09/2023, 3:16 PM