Andre Luiz
07/18/2024, 8:44 PMSource<T>
interface which FlinkKinesisConsumer
does not (it uses RichParallelSourceFunction
interface). My question is: Is there any way of using the Kinesis Source Connector in a hybrid source either by adapting it to extend some Source
interface or the HybridSource supporting RichParallelSourceFunction
?D. Draco O'Brien
07/19/2024, 7:31 AMAndre Luiz
07/19/2024, 11:22 AMD. Draco O'Brien
07/19/2024, 11:33 AMD. Draco O'Brien
07/19/2024, 1:38 PMpublic class KinesisSourceWrapper<T> implements Source<T, ?, ?> {
private final DeserializationSchema<T> deserializationSchema;
private final Properties kinesisConfig;
public KinesisSourceWrapper(DeserializationSchema<T> deserializationSchema, Properties kinesisConfig) {
this.deserializationSchema = deserializationSchema;
this.kinesisConfig = kinesisConfig;
}
@Override
public Boundedness getBoundedness() {
// depending on your use case, return either BOUNDED or UNBOUNDED.
return BOUNDED; // Or UNBOUNDED
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// Initialization logic if needed
}
@Override
public void run(SourceContext<T> ctx) throws Exception {
// instantiate FlinkKinesisConsumer and adapt its output to the SourceContext
FlinkKinesisConsumer<T> consumer = new FlinkKinesisConsumer<>(
"stream-name",
deserializationSchema,
kinesisConfig);
// you'll need to adapt the RichParallelSourceFunction behavior to fit into SourceContext.
// this might involve managing parallelism manually and emitting records to the ctx.
// Note: This is a high-level concept and actual implementation can be complex.
}
@Override
public void cancel() {
// clean up or cancel logic if necessary
}
}
D. Draco O'Brien
07/19/2024, 1:39 PMD. Draco O'Brien
07/19/2024, 1:39 PMAndre Luiz
07/22/2024, 12:45 PM