Hello everyone! I'm trying to create a hybrid sour...
# troubleshooting
a
Hello everyone! I'm trying to create a hybrid source using Multiple Files sources and a Kinesis Source Connector. The problem is that the HybridSource only accepts Sources that implements the
Source<T>
interface which
FlinkKinesisConsumer
does not (it uses
RichParallelSourceFunction
interface). My question is: Is there any way of using the Kinesis Source Connector in a hybrid source either by adapting it to extend some
Source
interface or the HybridSource supporting
RichParallelSourceFunction
?
d
He Andre, so the FlinkKinesisCustomer as noted does note implement the Source<T> which is required to use HybridSource. No worries you can still achieve your goal by wrapping FlinkKinesisCustomer within a custom SourceFunction. Try it and let us know if you get stuck.
a
Hello Draco, is there any docs of how to do that? Sorry, newbie here.
d
I can provide something in an about an hour
πŸ™Œ 1
ok I think its going to look something like
Copy code
public class KinesisSourceWrapper<T> implements Source<T, ?, ?> {

    private final DeserializationSchema<T> deserializationSchema;
    private final Properties kinesisConfig;

    public KinesisSourceWrapper(DeserializationSchema<T> deserializationSchema, Properties kinesisConfig) {
        this.deserializationSchema = deserializationSchema;
        this.kinesisConfig = kinesisConfig;
    }

    @Override
    public Boundedness getBoundedness() {
        // depending on your use case, return either BOUNDED or UNBOUNDED.
        return BOUNDED; // Or UNBOUNDED
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        // Initialization logic if needed
    }

    @Override
    public void run(SourceContext<T> ctx) throws Exception {
        // instantiate FlinkKinesisConsumer and adapt its output to the SourceContext
        FlinkKinesisConsumer<T> consumer = new FlinkKinesisConsumer<>(
                "stream-name",
                deserializationSchema,
                kinesisConfig);
        
        // you'll need to adapt the RichParallelSourceFunction behavior to fit into SourceContext.
        // this might involve managing parallelism manually and emitting records to the ctx.
        // Note: This is a high-level concept and actual implementation can be complex.
    }

    @Override
    public void cancel() {
        // clean up or cancel logic if necessary
    }
}
You would need to test you implementation carefully before going into production.
Another option is to process sequentially or merge downstream in a flink job.
a
Hello Draco, thanks for that snippet!