Can someone point me to a resource to help build a...
# random
s
Can someone point me to a resource to help build a flink source?
n
s
Thank you been there. Not much help. Currently looking at this article https://medium.com/@SelimAbidin/how-flink-sources-work-and-how-to-implement-one-70b52fcfeb29 the Kafka source implementation and the flink connector project code
If I have to I'll go to legacy RichSourceFunction, but I don't want to
m
What type of source is it? Bounded or unbounded?
s
unbounded from polling source
I'm trying to build a source for NATS, I'm the lead Java developer for the NATS Java client
I'm close, but had been going around in circles yesterday
actually unbounded from a async source, so I'll have to manage when multiple messages come in
to queue them up or something
m
Im going to have a talk at Current next week about the Flink connectors and ecosystem, so I’m curious what you’re running into. I’m not too familiar with NATS though.
s
so NATS is simple messages. Pub/sub, request/reply and streaming. I'm working just on subscribe right now, I've already got a sink working
I have to connect to a server and listen. For instance not sure where I should make my connection. In the reader?
m
Any luck so far with the Splits/SplitEnumerator?
s
yeah, I've got State, I've got a split, which is just the subject I'm listening too
My state has assigned and unassigned subjects, working on getting the next one for handleSplitRequest
so I'll pull one out of unassigned
just working through the enumerator interface
m
Gotcha. Yeah we really lack a good blog on how to write a proper source and sink
Or docs
s
been there. My docs aren't awesome either.
that one article is helping and looking at the kafka code helps, but it's more complicated than pub sub. I will write an article when I'm done
We are also discussing on contributing this to the flink project. A customer is paying for it but the contract says it's OSS
plus we know it's needed
m
I can see the benefits for the Flink community too
Have you looked at other sources that use the source interface, like Pulsar? There’s also new work being done on porting the Kinesis source to the new interface
Perhaps @Hong Teoh can share some of his thoughts on the latter
👀 1
s
Thanks for responding. I'll try to keep working this and ask for help as little as possible. I did not look at the those. Are those code bases near the kafka connector, if so I'll find them
h
Interesting to find out about NATS!
I have to connect to a server and listen. For instance not sure where I should make my connection. In the reader?
This needs to be done in the SplitReader. The threading model here will be interesting. Is there a client for NATS that handles polling records for a given “partition”?
s
currently I can start a background thread that just gets messages for a subject
👍 1
I needed to handle when many come across fast.
h
IIUC, in the Kafka consumer, most of the heavy lifting is actually delegated to the KafkaConsumer. The SplitReader.fetch() is actually called via a SingleThreadedManager that polls a single KafkaConsumer instance (which has its own thread pool). This means any connection can be sync/blocking, because the thread pool actually making the connection is separate from the Flink job thread. Sounds like you’re implementing something similar to Kafka source then
For Kinesis source we do something slightly less efficient 👀. Most of the heavy lifting is actually done in the Flink job itself. The SplitReader.fetch() will make a call to Kinesis endpoint to retrieve records. (Single threaded manager is used as well) But we do this because we have another mode (EFOConsumer) that has the same model as the KafkaConsumer (manages its own thread pool)
An initial simple implementation for NATS could be like the Kinesis one, then iteratively improved on. I made a simple diagram when trying to understand the source framework that you might find helpful! 😄 Key points: • Green bits are the bits you have to implement for each Source • SplitEnumerator -> Discovers the smallest unit (partition, shard etc) • Split -> The actual unit (partition, shard) • SplitReader -> Reads from the assigned shard • SplitFetcherManager -> Spins up threads to run the SplitReaders. Both Kafka and Kinesis use single threaded FetcherManagers. • RecordEmitter -> Outputs records from Source to the actual Flink job graph. Key point here is to ensure the Source state is only updated as “read” after emitting the records to the job graph. Otherwise exactly-once semantics are violated.
Sorry for the splurge 😆 Happy to answer any questions that you have! Especially around exactly once semantics / state handling
s
ok I'm in a meeting will look at this soon
So eventually maybe more like the Kafka source when I'm talking to stream. But this isn't really helping much either. I have no state except my connection and the subject I'm listening too. Looking in the kinesis code My splits are basically covers a single subject(topic) I don't get this in handleSplitRequest.
Copy code
// Do nothing, since we assign splits eagerly
What happens splits are added back? Something else is running that reassigns a split?
And then in start(), there are a couple context.CallAsync. I mean I feel like a complete idiot, I'm just not seeing it, I just want to have something start a reader. If that reader fails, fine, let it be pulled, record somewhere that that subject/topic isn't being read, and start a new one.
Do I even need split state? I have none except the split itself
I can make my reader polling or async too, not sure which way to go. Polling is easier, but I don't want messages to get backed up
async I could add a queue to hold messages I'm given and then hand them out when asked
I think Kafka might be closer to async as described above.