Hi Team, We have a REALTIME and OFFLINE User table...
# general
m
Hi Team, We have a REALTIME and OFFLINE User table. We are pushing data to the User table and have setup minion for moving data from REALTIME to OFFLINE table. When data is getting moved to OFFLINE table, is there a way to create segments for an OFFLINE table based on the column value? For Example: Our User table is getting populated from 3 different sources. One of the column of this table is Source and it tells us from which source this particular user data is generated. So, is there a way to create the segments for this table based on the source column value? • The reason we are looking for such use case is - Suppose if the user data from one of the source is wrong, then we can backfill only that particular source segment and not all the other segment.
m
When you say build segment based on column value what do you mean? How will the segment generation differ based on column?
m
By build the segment I mean, is there a way to generate segments in Pinot based on a column value. For Example: Name, Source User1, Source1 User2, Source1 User3, Source2 User4, Source2 Is it possible to have records related to Source1 in one segment and records related to Source2 in another segment?
m
Currently there isn't. Can you partition the data upstream so that a source is present only in a specific subset of partitions?
m
Even if we partition the data upstream the data in the real time table will go to a subset of partition, eventually the data from same source would go to same segment. However, when this data is moved from REALTIME to OFFLINE table via Minion, one segment might have data from multiple sources right?
m
We have minion job to reparation, but currently in closed source cc: @User
m
@User Have you got this use case handled? We also have similar use case to handle. We want minion to generate segments based on some column.
m
@User, not yet. I am yet to start with this one as of now
m
ok, I have explored the possibility, it can be done. There is a
ColumnValuePartitioner
which can be used. How I have done it - • Added below two new config properties in
RealtimeToOfflineSegmentsTask
(subject is column in transcript example table)
Copy code
"RealtimeToOfflineSegmentsTask": {
  "partitionerType": "COLUMN_VALUE",
  "partitionColumn": "subject"
}
• Generate
PartitionerConfig
of PartitionerType
*COLUMN_VALUE*
based on these above configs and set it in
SegmentProcessorConfig
• Add the partitionId in
custome.map
of segment metadata inside the
SegmentProcessorFramework.build
method. What do you think @User?
This additional configuration can not be supplied in
tableIndexConfig.segmentPartitionConfig
as config only support
TABLE_PARTITION_CONFIG
partitioner type implicitly.
I think, with this approach, partitioned based segment pruning won’t be performed by the broker.
@User Could you please provide your feedback?
@User ^^
m
@User are you saying you already are able to partitioning or are you proposing an implementation?
m
I am already able to partition segments in
RealtimeToOfflineSegmentsTask
minion task process with modified code that I have locally.
and then I am proposing an implementation.
m
Thanks. @User
m
@User Can I go ahead and create a proper PR to merge this feature in apache/pinot~master branch?
m
@User
m
@User I was checking with @User who might already have this. Jackie could you comment?
j
Ideally we should group segments within the same partition on the task generator side. Also, the partition config should come from the OFFLINE table because broker is checking the partition config in the OFFLINE table config while routing the query. @User is working on enhancing this task. Do you think we can also support partitioning in your new design?
x
yeah, I think it can be supported, and grouping segments within the same partition on the task generator is also doable (more for improvements, not necessarily for correctness). the extension suggested in the code snippet above to enable COLUMN_VALUE partitioner for SegmentProcessorFramework would be necessary anyway. So maybe refactor it so that it can work correctly for routing query as suggested by Jackie above, and merge it first.
m
It would be awesome if you can add support for COLUMN_VALUE partitioner such that it can also be used by broker to prune segments. Also, As of now, RealtimeToOfflineTaskExecutor and Broker Segments Pruning logic only support partitioning on one column, i.e. it is not possible to use HashCode partitioning on one column and COLUMN_VALUE partitioning on another column. If we are refactoring a lot partitioning logic across Broker and Minion, Can we also support multiple column partitioning?
v
@User ^^
j
@User Multiple partitioning columns is possible for segment generation and pruning purpose, but we need some further brainstorming integrating it into Realtime table (the realtime consumer partition can only map to one partition id) and also segment assignment (assign segment based on the partition id). I would suggest limiting the scope of the enhancement so that it can be done faster. Supporting multiple partitioning columns can be discussed as a separate topic
@User IIUC, the ask is to automatically partition the data if partitioning config is provided, without asking users to further config the partitioning within the task config? cc @User
m
Hi @User Yes, multiple column partitioning can wait.
Yes, you understood correctly, Requirement is to automatically partition the data based on column value without adding further config in task config such that broker uses the segment partition metadata to prune the segments.
This can be done in generic way if
PartitionFunction.getPartition(Object value)
can return
String
and we implement a NoOpPartitionFunction for simply returning column value as partitionId. (a column value can be any type).
j
We cannot have
String
as the partition id (partition id should be from
0
to
numPartitions - 1
). Any specific reason you want to directly use column value as the partition id? Does hash-based or modulo-based partitioning work for your case?
m
image.png
The image explains the original requirement. We can’t use hash/module/murmur/ based partitioning because partitionId could be same for different values of column and those rows will end up in the same segment.
Hi @User and @User , Since we can not use hashCode/murmur based partitioning, we have come up with another partitionFunction called
BoundedColumnValue
which is enum based, One can configure the different values for partition column on which he/she wants to partition segments. PartitionId would remain integer value. Broker can also use this partition function to prune segments. An example config would look like - Here, User want to partition segments on these three subjects given in
columnValues
. PartitionId would be 
1
 for Maths, 
2
 for English and so on. PartitionId 
0
 is reserved for any other subject which are not present in given config but may occur as a value for column.
Copy code
"tableIndexConfig": {
  "segmentPartitionConfig": {
    "columnPartitionMap": {
      "subject": {
        "functionName": "BoundedColumnValue",
        "functionConfig": {
          "columnValues": "Maths|English|Chemistry"
        }
      }
    }
  }
The
functionConfig
is persisted along with
functionName
into metadata.properties as well as in segment metadata in zookeeper. In addition to this, I have also looked into multiple column partitioning for offline table. Please have a look on this PR and provided your feedback on design. https://github.com/kmozaid/pinot/pull/1/files Please have a look on following PR https://github.com/kmozaid/pinot/pull/1
j
@User The
BoundedColumnValue
partition function idea looks good to me. I would suggest just adding the new partition function, and try to support multiple partition columns in a separate PR. Currently pinot asserts that there is only one partition column in multiple places, and we need to revisit all of them to ensure multiple partition columns work properly
👍 1
m
I will create PR with description. Thanks Jackie for guidance.