Hello, apologies in advance for the lengthy questi...
# troubleshooting
n
Hello, apologies in advance for the lengthy question. ---------------------------------------------------- I have a problem and I need some opinions on how to deal with it, the least intrusive, the better. I have some re-design ideas but I would like to avoid those, as they have impact and most importantly there are other political reasons which will produce problem if I push for re-design. ---------------------------------------------------- Intro to the problem So in our business need, we wanted the KafkaSource to read from a topic if a predicate gets satisfied, and that predicate can change (true/false) during the run of the job. For example in pseudo code:
Copy code
void kafkaConsumeLogic() {

  kafkaSource.run();
  boolean kafkaSourcePaused = false;

  boolean okState;

  while (true) {
    
    okState = areWeInDesiredState();
    
    if (!okState && !kafkaSourcePaused) {
      kafkaSource.pause();
      kafkaSourcePaused = true;
    }

    if (okState && kafkaSourcePaused) {
      kafkaSource.resume();
      kafkaSourcePaused = false;
    }

    paceWait();
  }

}
So, we have achieved the above by leveraging HybridSource (https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/) and using the first registered source as a predicate-check Bounded source which: • it will check for that areWeInDesiredState() predicate/condition, and if all ok.... • ...then in the SourceReader it provides, in the pollNext method, will return -> InputStatus.END_OF_INPUT, so the next registered source will take over which is the KafkaSource. So the above pseudo code has been implemented like the following:
Copy code
class PredicateSourceReader<T> implements SourceReader<T, MySourceSplit> {

   ....

    @Override
    public InputStatus pollNext(ReaderOutput<T> output) {
        if (areWeInDesiredState()) {
            return InputStatus.END_OF_INPUT;
        }
        return InputStatus.MORE_AVAILABLE;
    }

  .....

}
and the wiring:
Copy code
PredicateSourceReader<SomeType> sourceActivator = new PredicateSourceReader<>(....);
Source<SomeType> initialSource = new Source<>(sourceActivator);

KafkaSource<SomeType> kafkaSource = buildKafkaSource(....);

HybridSource<SomeType> hybridsource = HybridSource
                .builder(initialSource)
                .addSource(context -> kafkaSource, Boundedness.CONTINUOUS_UNBOUNDED)
                .build();
---------------------------------------------------- Problem Unfortunately the above is not the most elegant + correct usage of HybridSource in my eyes, but the most important is that it does not work in the following case - which where I need some opinions or if I miss something: So, based on the above description if we satisfy the condition/predicate, we switch to kafka source, but there is another business need we need to satisfy, where for example when we identify another bad condition/predicate we fail on purpose with exception, and due to restart strategy (exponential) and failover strategy (region) this hybridSource gets restarted...which means due to re-init again, it should go again to first initialSource and kafkaSource should not run, but this is not the case, as we see kafka consumption still taking place. I have some ideas on my mind, but first I want to be sure, why after region restart, the kafkaSource is still open and not closed. Also I tried to extend KafkaSourceReader and in the pollNext method to wire there the logic, but KafkaSource is very closed to extend and only accessible through KafkaSourceBuilder which from framework user point of view does not give a lot of options.