Hi! We are building a new job that initially, the ...
# troubleshooting
o
Hi! We are building a new job that initially, the first time, reads data from a CSV file to populate some state and afterwards will not be needing this CSV file and do a keyedcoprocessfunction on 2 streams and updates on the original state. I wonder how I can make the initial state the first time I read from the CSV file? Originally I thought of creating 3 streams and doing a union of the CSV file stream with the accountchange stream since they are of the same nature. See the code snippet for reference:
Copy code
val accountChangedStream = accountDataStreamFactory.get().keyBy { it.accountUserId }
val accountActivityStream = accountActivityDataStreamFactory.get().keyBy { it.userId }
val csvStream = factory.get().keyBy { it.userId }.process(ReadFirstTimeProcessor())
val joinedStream = accountActivityStream.union(csvStream)
val clusterChanges: SingleOutputStreamOperator<CustomerClusterEvent> =
    accountChangedStream
        .connect(joinedStream)
        .process(ActivityClusteringProcessor())
        .name(dataStreamName)
the problem is that by doing so, I will have to read the csvStream everytime and then do the discard in the process function (ReadFirstTimeProcessor)
is there any way I could prevent from reading the CSV stream after the first time? Thanks!
v