Hey folks, I was curious about the flink segment w...
# general
e
Hey folks, I was curious about the flink segment writer[1][2]. Is column sorting supported when using this method of segment creation? Is that handled internally by IngestionUtils.buildSegment i.e. SegmentIndexCreationDriverImpl#build ? Or is sorting not supported when building segments with Flink? Thanks! [1] https://github.com/apache/pinot/blob/e41d86b4b1225a4857767960ce595d735d5ec620/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java [2] https://github.com/apache/pinot/blob/e41d86b4b1225a4857767960ce595d735d5ec620/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/PinotSinkFunction.java
m
Likely not, but adding @Yupeng Fu to confirm.
e
Got it, I assumed that was the case. A few follow up questions: 1. To what extent does sorting impact query latency? How much would one expect query latency to decrease by having a sorted column Vs no sorted column assuming that the column is always used for filtering? 2. Is strict “sorting” a requirement ie all values in a column are in ascending/descending order, or is it sufficient that all rows with the same value for the sorting column are in contiguous block such that a start and end index for that sorting value could be computed?
3. Can segments be concatenated, or is there a construct like a partial segment that can be combined with other partials to create a segment?
m
1. Sorted index definitely helps in quickly identify the rows that satisfy the predicates. Inverted index does the same. Although, sorted index provided locality (then can reduce disk fetches), as well as does not need additional space (data and index are the same storage).
2. Yes, strict sorting is required for fast lookups
3. Segments can be concatenated via segment merge minion job
e
2. Contrived example to clarify: If I had the sorting column
foo
holding INT values, and 5 rows with the values: 1, 1, 2, 3, 3 Could these rows appear in this order in the segment and be considered sorted (this assumes that the sorted index only holds start and end indexes): 2 1 1 3 3 where the sorted index would look like:
Copy code
Value   start_index   end_index  
 ------- ------------- ----------- 
      2             0           0  
      1             1           2  
      3             3           4
3. Can Segment concatenation be extracted or used externally from minions? I ask all this thinking about a system like Apache Beam/Apache Flink where “sorting” (grouping, really) could be accomplished in parallel by using KeyedStreams (key value pairs) keyed off of the value of the sorting column, writing “partial segments” for each key, and concatenating the partial segments in any order given that each partial segment would hold all the data for a single value of the sorting column
The bit about “concatenating segments” is not a strict requirement; if locality is sufficient as opposed to all values in the sorting column being in order, I think sorting within a window of data in a streaming system could be accomplished with reasonable parallelism and without needing to hold the entire dataset in memory to sort it
Possibly even better might be if the concept of “ConsimingSegment” were usable in a stream processing system, effectively using stream processing to create offline segments via the same mechanism that Pinot real-time consumption does (but without the need for the resources of the cluster to be used in that computation)
m
2, 1, 1, 3, 3
is not a sorted sequence, so it won’t be considered sorted column in Pinot.
3. Concatenating sorted column based segments could potentially break sort order. But IIRC, there is a minion job that would preserve sorted order. It has been done outside of minion using hadoop/spark. I am not familiar with Apache Beam, but anticipate there should be a way to maintain sort order. Along the same lines is partitioning, where you can take a bunch of input files, generate partitioned and sorted segments.
e
2, 1, 1, 3, 3
is not a sorted sequence, so it won’t be considered sorted column in Pinot.
Is that based on Pinot doing pre-flight checks of some kind to confirm if data is sorted? Or is it a technical requirement? I'm sure that the examples in docs[1] is simplified, but it does look like sorting all values in the column would not be as vital as ensuring that rows for a certain memberId are contiguous. So long as there is a start and end index for a given memberId and all docs between those start/end have the same memberId, it seems like the intended functionality of sorted index would be supported. Thoughts? [1] https://docs.pinot.apache.org/basics/indexing/forward-index#sorted-forward-index-with-run-length-encoding
m
There is indeed a check during segment generation that checks ordering of rows and marks as sorted in the metadata. Note, for realtime segment generation, sorting happens within Pinot.
e
There is indeed a check during segment generation that checks ordering of rows and marks as sorted in the metadata.
Is this something that could be “forced” via configuration setting of some kind? If data does not need to be sorted globally per segment but rather grouped per memberId, it could be more flexible to allow this?
Note, for realtime segment generation, sorting happens within Pinot.
Right, that would be really awesome to take advantage of in a stream processing system. Effectively transplanting realtime segment generation into Flink/Beam/Spark-streaming etc. That way all the features (Ex. sorted index) of segment creation could be taken advantage of in distributed compute frameworks that operate per-element (ex. Flink, Beam) so that processing large amounts of data for backfill could be done without using the resources of the Pinot cluster itself.
m
How do you perform binary search over unsorted (but contiguous) data?
e
Ah, I didn’t realize that the sorted index was used to perform binary search. Haha, binary search over unsorted data would be quite impressive 😂 I had mistakenly thought that the sorted index was strictly used as a lookup table to collect a list of doc IDs for a given memberId
m
For you case, inv index will work as well, and will br better than inv index when data is scattered
👍 1