Could anyone explain why Pinot is using GroupByOrd...
# general
d
Could anyone explain why Pinot is using GroupByOrderByCombineOperator instead of GroupByCombineOperator for queries like:
select max(profit) from transcript group by strategy_id
?
x
This is used in SQL mode
message has been deleted
If u read through the code in
CombinePlanNode
GroupByCombineOperator
is the old PQL implementation which will sort results by the aggregated values. You can think of this is operator always append the clause
ORDER BY max(profit) DESC
into your query.
GroupByOrderByCombineOperator is new SQL implementation which will take the sorting params from the query, could be null, in your sample query case.
d
ok, got it! so using SQL it will never be called, right?
x
correct, SQL syntax will always call
GroupByOrderByCombineOperator
👍 1
d
@Xiang Fu for you what is the correct place to put a method that must be called at "server level" (not broker) that can handle the whole sequence of blocks
my aggregation need the whole sequence of blocks to do the logic (instead of using aggregate() method for each block
x
hmm
d
i think in the CombineOperator we were talking about, right ?
x
so you need all the blocks to compute and need to hold all the blocks in memory?
no intermediate state?
d
@Xiang Fu exactly...i know it could sounds "strange" but i am implementing something like window functions in Pinot, so i must work with the entire sequence of blocks becuase i need to do a cumulative sum
x
CombineOperator is used to combine multiple other operators and expected the things to merge are partial
d
exactly, i thought after that merge i can work with the entire sequence of blocks no?
x
I don’t think so, the place to look is the
InstanceResponseOperator
I suggest you go over the example query plan first
then we can think of how to do this
also it would be great to create an issue in github for window function
with some sample queries, so we can get more insights and recommendations from the community
d
yeah i think it could be great. Druid for example has an ext for window functions
@Xiang Fu wait moving the logic in InstanceResponseOperator, does the Broker run that code?
x
I suggest we go through one example ?
instanceBlock is the block after all combines and just before converting it to dataTable and sending back to broker
one thing i’m a bit confusing is that if we cannot process partial data in a segment/block, then it means we cannot process it in server also as server only holds partial data as well.
d
@Xiang Fu you are right, if we want to do something that can be used always we need to move the logic to broker
but in my case i will use partitions so i know that all the data is inside a single node
x
then in the aggregationfunction, try to hold the data
and do your logic in reduce method
d
yes i can "concatenate" the blocks
Copy code
extractFinalResult()
^^ there?
is that method not called from broker?
x
yes
in
extractFinalResult
but
extractFinalResult
will be called in broker
hmmm
d
but in that way i will move all the loginc into broker so not good
exactly, thats the problem
i should avoid moving all the load into the broker
do you know CItus? in Citus there is a distribution keys, so each shard has all the record of that specific partition key, i would like to do something like that in Pinot. Partition could be good to isolate rows having the same key (i will use in
group by
)
but moving all the load to the broker is bad because then it has to handle all the nodes responses...heavy load.
x
true
then basically we need to have a way to do inner/inter server level aggregation(merge)
in that mode we need to ensure over partitioning and all shard data are in same server
d
yes, correct
x
hmm
d
so for that reason i was thinking where to put the logic
x
could you share the query somehow
d
in postgresql is
x
I’m thinking maybe we can run query twice
d
Copy code
explain analyze 
WITH t2 AS (SELECT strategy_id, MAX(profit) OVER (PARTITION BY strategy_id ORDER BY id RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) - profit as drawdown    
	FROM trades)
SELECT strategy_id, MAX(drawdown) max_drawdown
FROM t2
GROUP BY strategy_id
ORDER BY max_drawdown;
x
first round is to extract the sequence/stats, then second query will to the aggregation
d
for a simple table like :
Copy code
CREATE TABLE trades (
   id Serial,
   strategy_id Integer,
   profit Integer,
   PRIMARY KEY (strategy_id, id)
);
i need the running max and then i need to subtract it to a value
....and after all getting the MAX
x
ic
so need to run
select strategy_id, max(profit) group by strategy_id
first
then do
Copy code
SELECT strategy_id, MAX(max_profit - profit) as max_drawdown
FROM t2
GROUP BY strategy_id
just the
max_profit
values are different per
strategy_id
d
I Need to do max(drawdown) the drawdown is the runnin max - profit
yes everything grouped by strategy_id
x
what’s the cardinality of
strategy_id
?
have you tried presto as a workaround right now ?
d
the max(profit) is the running max
x
the first max group by query should be able to push down to pinot to execute
d
no i havent used Presto because talking with Kishore he told me that it will be slower because moves everything to the Presto worker
x
the outer aggregation will be processed by presto
hmmm
true
as the out query will also read from pinot
it’s actually a join on
strategy_id
🙂
d
what do you exactly mean? moving the running max on presto and then other on pinot?
x
I think it will not work
nvm
presto still needs to read the entire table in order to do the computation
d
doing this aggregation is easy if we work on the entire sequence the steps are:
x
since the outer aggregation cannot be pushed down
d
1. working with sorted rows
2. just one loop over the entire sequence saving the running max
3. simple subtraction
sure ...this logic for ALL the strategies (group by strategy_id)
x
right
d
where can i do a fast test for it ? i mean...maybe not the best solution but where i can get the whole sequence to put this logic?
x
I feel we may need to add a new interface in aggregationFunction to separate server and broker extract Results logic
d
YEAH
that's will be a very smart solution
x
we made an assumption that partial results are same across server and broker
d
because of the partitions, right?
x
right
d
yes i think without partitions...the job must be done via broker
because all the rows with the same strategy_id must be on the same node
(mandatory)
x
check this method
Copy code
/**
   * Extracts the intermediate result from the aggregation result holder (aggregation only).
   * TODO: Support serializing/deserializing null values in DataTable and use null as the empty intermediate result
   */
  IntermediateResult extractAggregationResult(AggregationResultHolder aggregationResultHolder);
in
AggregationFunction
you can create your own holder
which keeps all the blocks
just in
extractAggregationResult
you do the scan twice
also you can keep a separated max during the merge/scan
basically the
AggregationResultHolder
is a tuple of (Double maxProfit, List<Double> profit)
d
a fast recap, aggregateGroupBySV() i will add all values into groupByResultHolder and then in extractAggregationResult i will have all the values i need for computation right?
x
yes
that’s the simplest thing you can give a try
one example you can follow avg
avg function keeps count and sum in the resultHolder
d
ok so "extractAggregationResult() will be called after all the blocks
after all right?
i mean after that method Pinot will not process other blocks right?
x
yes
in server level
and you still need to save all the block values into your
AggregationResultHolder
e.g. use a set to remove duplicates values
d
yes ok, very good...but there is another important thing to consider...i MUST deal with ordered sequence, for example my aggregator should be select... DRAWDOWN(id, profit) it means that i need to sort by id and then do the logic (running max) over profit column
in this situation is better to create a sorted index on ID column ?
x
yes, but that won’t guaranttee the ordering across multiple segments
if those segments are processed in parallel
d
so blocks can have random order
x
right
d
ok but extractAggregationResult will be called ONE time only...so maybe i can do a sort() inside that method? or maybe when i add the values i can use a list that is already ordered
what do you think?
x
you can do that for now
or you need to alter query plan to make the query execution in sequential and order segments as well
d
but in this case will be slower for sure no?
i think it is not a problem maybe i can do in parallel and then add a sort inside extractAggregationResult()
or as i told yuo i can use a sorted map in AggregationResultHolder
@Xiang Fu maybe a SortedMap object ?
x
you can try both 🙂
I think sorted map will work
d
ok
i try
thank you so much
@Xiang Fu still there? with batch ingestion how can i specify the partition of my segment? for example i have three files in /mydir/ a.csv b.csv c.csv I know Pinot will create three segments, is there a way in yaml to specific the partitions?
x
@Jackie could you provide more instructions here for data partitioning
j
@Damiano You can config the partition for a table within the table config. When creating segment, if the source data is already partitioned, Pinot can automatically figure out the partition of the segment
d
@Jackie ok I found the settings segmentPartitionConfig. Did you mean that right? However for batch ingestion I did not find the way to specify which segments to put in partitions. Does Pinot will understand it automatically looking at the field I set in the config and the PartitionFunction (like MURMUR, Modulo etc), or do I need to format the input in some manner? ( I use csv)
j
@Damiano Yes. In order to make partitioning work, the input data should already be partitioned with the PartitionFunction specified in the partition config. When creating segment, Pinot will read the partition config and use it to figure out the partition of the input file.
d
@Jackie ok, just a fast example to understand it better. I create the schema of the table setting the number of the partitions(10) the function i would like to use (Modulo) and then regarding the batch ingestion, let's suppose a CSV with 10M of rows, at the moment to create the segments I split the big csv with 10M rows into 100 smaller CSVs that have 100.000 rows each. Can I continue using this setup or do I have to format my csv in a different manner? In this example what does Pinot do internally? Will assign those 100 segments to my 10 partitions? Or can I simple ingest the big csv because then Pinot will assign each row to the correct partition looking at the partition key? If true, how does Pinot will create segments INSIDE each partition? Thank you!!
j
@Damiano When you split the CSV file, you should partition them with the same function configured in the Pinot config so that each file only contains rows for one partition. When Pinot creates the segments, it will use the partition config to match the input file and figure out which partition it belongs to, then assign it accordingly
d
@Jackie do I have to use a specific name for the csv? I do not know maybe 1.csv for partition one... 2.csv for partition two and so on... However before using partition I splitted the big csv to create segments. Now I have to split the csv to create partitions, right? In this case how can I create the segments inside the partitions? Because if I create 10 partitions so there will be 10 CSVs, so how does Pinot will create the segments inside them?
j
No, there is no requirement for the input file name. When you split the csv file, you can create multiple files for each partition (e.g. 10 segments per partition, or 9 for partition 0 and 11 for partition 1). Pinot will create one segment per input file
d
wait @Jackie i am still a bit confused about that, sorry. Before you said that i need to split my big CSV into smaller files to "group" rows with the same partition key. So, for 10 partitions i should create 10 files. I miss the second part you just wrote. If i can create multiple files, for example, splitting my 10M file into 1000 smaller files. It means that every file will have ~10k rows. I suppose that i should group in the same segment (csv file) the rows with the same partition key, and then Pinot will assign that segment (all the rows of that csv file) to the partition. Right? The important thing is putting in the same segment the rows with the same key, then Pinot internally will assign it to the partition 0 or 1 or 2 etc... Ok? Is correct what i just said? Lastly, if i have an offline table where i ingest data everyday (adding new rows via CSV), should i do a similar thing? I mean, my table has 10M rows, then tomorrow i need add new data in that table, lets suppose 10 rows in total. In that case should i create 10 files (one per row) to match the partition key, right? Obviously if two rows have the same partition i will put both into the same CSV, but is the logic correct? Thank you very much!
j
@Damiano There is only one requirement for partitioning to work: all the rows within the same input file are within the same partition.
But if the volume of the data is small (e.g. less than 1000 rows per day), then I don’t think the data is worth partitioning because of the overhead of processing tiny segments
d
@Jackie I read on Slack that Pinot will merge small segments. I think it is a feature under development. However in my case I must use partitions to ensure all the rows of a partitioning key are in the same node and not splitted in the whole cluster.
j
Can you elaborate more on your use case? What is the total data size and how much data will you generate every day?
d
@Jackie I am using Pinot on many projects, but the one where I need partitions is a project that needs to analyze stocks. I do backtests of strategies using Pinot to retrieve statistics of strategies trades. As I told you, I need partitions because I am implementing a similar logic of window functions, so I must deal with the entire sequence of rows(for example to calculate the running sum). For this reason I need "windows". Another example could be calculating the moving averages, so the job is (a) add the historical trades of the strategies to monitor, the 10M i said. (b) adding new trades everyday. Partitions are a must in this case because if I do not use them I must move the logic of the windows from the server to the broker, and that's no good.
j
@Damiano I see. In that case, you need to create and push 10 segments per day (one per partition) everyday. The reason why I asked about the data size is because for small amount of data (e.g. ~10M rows) you can also consider refreshing the whole table everyday instead of keep adding tiny segments.
d
@Jackie yes i can do that too, it is not a problem it will take few seconds. But i think with the next release this problem should be solved because they are implementing a merge of segments. However yes, i will refresh the entire table everyday, thank you!