nick christidis
04/07/2023, 2:20 PMvoid kafkaConsumeLogic() {
kafkaSource.run();
boolean kafkaSourcePaused = false;
boolean okState;
while (true) {
okState = areWeInDesiredState();
if (!okState && !kafkaSourcePaused) {
kafkaSource.pause();
kafkaSourcePaused = true;
}
if (okState && kafkaSourcePaused) {
kafkaSource.resume();
kafkaSourcePaused = false;
}
paceWait();
}
}
So, we have achieved the above by leveraging HybridSource (https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/)
and using the first registered source as a predicate-check Bounded source
which:
• it will check for that areWeInDesiredState() predicate/condition, and if all ok....
• ...then in the SourceReader it provides, in the pollNext method, will return -> InputStatus.END_OF_INPUT, so the next registered source will take over which is the KafkaSource.
So the above pseudo code has been implemented like the following:
class PredicateSourceReader<T> implements SourceReader<T, MySourceSplit> {
....
@Override
public InputStatus pollNext(ReaderOutput<T> output) {
if (areWeInDesiredState()) {
return InputStatus.END_OF_INPUT;
}
return InputStatus.MORE_AVAILABLE;
}
.....
}
and the wiring:
PredicateSourceReader<SomeType> sourceActivator = new PredicateSourceReader<>(....);
Source<SomeType> initialSource = new Source<>(sourceActivator);
KafkaSource<SomeType> kafkaSource = buildKafkaSource(....);
HybridSource<SomeType> hybridsource = HybridSource
.builder(initialSource)
.addSource(context -> kafkaSource, Boundedness.CONTINUOUS_UNBOUNDED)
.build();
----------------------------------------------------
Problem
Unfortunately the above is not the most elegant + correct usage of HybridSource in my eyes, but the most important is that it does not work in the following case - which where I need some opinions or if I miss something:
So, based on the above description if we satisfy the condition/predicate, we switch to kafka source, but there is another business need we need to satisfy,
where for example when we identify another bad condition/predicate we fail on purpose with exception,
and due to restart strategy (exponential) and failover strategy (region) this hybridSource gets restarted...which means due to re-init again,
it should go again to first initialSource and kafkaSource should not run, but this is not the case, as we see kafka consumption still taking place.
I have some ideas on my mind, but first I want to be sure, why after region restart, the kafkaSource is still open and not closed.
Also I tried to extend KafkaSourceReader and in the pollNext method to wire there the logic, but KafkaSource is very closed to extend and only accessible through KafkaSourceBuilder which from framework user point of view does not give a lot of options.