Hello team :wave: I have a job which builds segmen...
# troubleshooting
l
Hello team 👋 I have a job which builds segments and published them to deep store. Currently I have a problem where since I am building so many segments, I am bottlenecking on IOPS/throughput on my volumes. Is there any way to build the segments completely in memory without flushing to disk? I have been looking around the APIs but can't seem to find anything like that. Any help appreciated 🙏
s
I suppose you are referring to building segments in realtime servers. There is a configuration that limits the total number of segment builds per server. You may want to try that.
realtime.max.parallel.segment.builds
l
No I am referring to using the internal Pinot tooling for building segments myself. I am writing a backfill job using flink, and I've written my own connector for pinot. I just need some guidance on where to look in the codebase. Is it currently possible to build segments completely in memory? And if not, where would I need to change functionality to make that happen
m
Afaik this feature does not exist, but should be doable. You can check out Segment generation code . Wouldn’t you run out of memory though?
l
I have a lot of memory at disposal, but we could also look into spilling to disk if memory usage is too high
With targeting 200 MB per segment, and building ~ 16 segments concurrently, that would be 3.2GB of memory + other temporary memory. If we add a little bit of buffer to be reasonable, let's say total memory usage of 6 GB, that isn't unreasonable for fully in-memory segment building. Does that sound right?
Even with 500 MB segments that would be 8 GB (Let's say 16GB) and that's still reasonable.
m
Are you able to create 16 segments concurrently today? Note that it will use a lot more heap to build the segments (as compared to size of segment on disk). Also when you want to store in memory so you mean Java heap, or direct memory?
l
I am able to create 16 segments concurrently
👍 1
I could use both heap or direct memory, it doesn't really matter.
I am currently processing roughly 3.2 billion rows and creating segments from them in 1 hour.
This results in ~108 segments
I upload them using the two-stage commit protocol of the pinot controller
segment replacement
m
Wait, you are creating these segments outside of Pinot right? If so you can just push to controller api, why two stage commit?
l
I need atomic replacement of segments
It's a job which populates segments of a refresh table
And I need all the segments to be updated at the same time
It's working well, but I am hitting I/O limitations.. I have ways around it, but it would be nice with the option of building segments completely in memory
m
IO limitation where? Outside of Pinot when building the segments? Suppose you keep them in memory in your flink job, how do you plan to push to Pinot without writing to disk
Sorry I am still unclear on your setup
l
No problem. During segment creation, the segments are currently written to disk. For persisting them in pinot, I would write the segments directly to deep store (S3 in my case)
The way the segment generation code is written at the moment there doesn't seem to be a way to buffer in memory, only to disk
m
Ok, so you want a segment writer that directly outputs the segment to S3 bucket?
l
Yeah without buffering to disk that would be the idea
m
Ok, check out
SegmentIndexCreationDriverImpl
. It is an impl of an interface. You can implement one that writes to S3 directly. Perhaps create a base class to reuse most of the code.
l
Thanks Mayank, I'll have a look at that
m
As for atomic push, I think the right way to do so would be to build upon the existing push api. I think @Seunghyun did some work on atomic push.
l
I am using @Seunghyun's changes actually
👍 1
It works great
It's not documented anywhere, but it was as simple as telling the controller that I'm going to be replacing segments, uploading them and then ending the replacement
m
Is that already completed? If not, perhaps contribute back? I think atomic push is a very useful feature
l
It's finished, I haven't made any changes to it
But I made my own custom flink connector for it
m
ah ok
l
The flink connector in the pinot repo at the moment is not really safe for production
I would contribute the flink connector I've written, but as it stands with the way it's written, it only works for refresh jobs where you are replacing all the segments in the table on every single run
👍 1
If that's something you're interested in anyway, I could look at PRing it as a separate connector?
m
Yes. How is it different from the flink sync @Yupeng Fu was working on ?
l
The current flink connector does not gracefully handle errors. If the connector fails in the middle, your table is now inconsistent, as it replaces segments as it goes.
The current flink connector also does not perform well. It bottlenecks on the sink writing. I've split out a lot of the heavy work to prevent this
It also does not support atomic segment replacement, nor does it support anything but TAR push of segments
However @Yupeng Fu's connector supports appending segments to a table, the connector I've written does not support that at the moment.
m
Does it make sense to unify both?
l
Yes
But it would be a lot of work
m
Cool. May be start with a GitHub issue. We can get some ideas there and then proceed
l
They are actually decently similar. I've spent a lot of time on this connector now.. I think if someone had their hands on both of them, they could grab the good parts from both and make it work for all usecases
Sure, sounds good.
👍 1
I could also PR it and try to unify it as best as I can with the current Pinot APIs.. And then if that's okay, someone else could take on the task of trying to unify the two together? It's up to you
y
cool. glad to see the proposed improvements. the current connector replies on the name convention to be idempotent, so the next run will replace the segments by name
s
One way to build segments completely in memory is to use a memory-based file system such as tmpfs
@Lars-Kristian Svenøy ^^
l
Thanks @Subbu Subramaniam. I guess this is one possible way to go, but I also think it is fairly restrictive to enforce the usage of a specific file system implementation to make this work. Nothing bad with having the option, but being able to control the memory directly would be great too.
I looked into @Mayank’s idea of simply implementing the segment index creation driver, but unfortunately the interface implements File getOutputDirectory().. This would either have to change, or we would need a new interface.
Alternatively abstract away file and use the Pinot filesystem to allow the output to be elsewhere..
In that case this should return an URI, which would then have to pass through PinotFS
I think a general improvement to the architecture would be to concretely return types which have to pass through PinotFS. As it stands, since PinotFS takes a generic java URI, there's no way of knowing if a specific URI has to pass through this pattern.
A good inspiration for this would be to look at the way flink handles abstracting away the file system
It's fairly elegant
m
Thanks @Lars-Kristian Svenøy, please do update the issue with summary of your findings, so it is easily discoverable. I feel modifying the interface should be up for discussion.
y
cc @Rong R
l
Added more details to the issue there now