https://pinot.apache.org/ logo
s

Sadim Nadeem

06/09/2021, 7:39 AM
is there any way to aggreagete time series data on basis of tumbling time window while ingesting in pinot itself say on basis of time and update the records automatically without any need of tumbling window aggregation in my stream processing job(ex-samza job/spark streaming) .. means can inot keep updating the same countPerMin column automatically whenever new feeds for same time window comes .. or they have to be handled in samza jobs/spark streaming only .cc: @Mohamed Hussain
x

Xiang Fu

06/09/2021, 8:26 AM
right now you need to do this in samza or flink job. Pinot consumes data from kafka as it as and only support row level transformation.
s

Sadim Nadeem

06/09/2021, 8:49 AM
also my topic is not partitioned by key on which I am aggregating(It was partitioned while creation of the topic itself to say 20 partitions) .. thus multiple events on topic from where pinot consumes will be generated for same key for same time .. ie suppose no of page views for 9th june at 14:13 IST .. 5 records are ingested by pinot with no of page views for the same user id as 10,12,4,5,3 since topic was partitioned and multiple samza consumers of same samza consumer group were aggregating for the same topic from different set of partitions .. and thus emitting different msgs on output topic from where pinot consumes .. can I upsert the pinot record for same user id by adding the last value with the latest value ie for userid="123",time="09/06/2021 14:17 IST",countPerMin=5 was there already .. now same userid="123",time="09/06/2021 14:17 IST",countPerMin=10 record came on kafka topic where pinot is consuming from .. then will it update the existing record from userid="123",time="09/06/2021 14:17 IST",countPerMin=15 .. is such kind of upsert possible? incremental upsert kind of ufnctionality or maybe through some sort of user defined function?
x

Xiang Fu

06/09/2021, 8:54 AM
for upsert, if you make user id as primary key and time, then you can do pre-agg on your streaming job to publish the second msg like
{*userid="123",time="09/06/2021 14:17 IST",countPerMin=10*}
and ``{userid="123",time="09/06/2021 14:17 IST",countPerMin=15}` pinot will upsert the field
countPerMin
s

Sadim Nadeem

06/09/2021, 8:58 AM
so here.. will countPerMin become 15 or it will become 25 since I dont just want to be updated with the latest value .. I want it to add with the last existing value
x

Xiang Fu

06/09/2021, 8:58 AM
latest value
15
s

Sadim Nadeem

06/09/2021, 8:58 AM
any way to add with the existing value by some UDF or any means
since my use case requires it to be updated with 25 rather than 15
as 10 + 15 =25
x

Xiang Fu

06/09/2021, 9:01 AM
then you don’t need upsert, you can just do it during runtime to sum up
s

Sadim Nadeem

06/09/2021, 9:01 AM
u mean in my samza job itself
x

Xiang Fu

06/09/2021, 9:01 AM
or you generate this 25 in your streaming job and use upsert
s

Sadim Nadeem

06/09/2021, 9:02 AM
you can just do it during runtime to sum up > how to do that in pinot
x

Xiang Fu

06/09/2021, 9:02 AM
query
s

Sadim Nadeem

06/09/2021, 9:03 AM
ok so every event before i need to publish on output kafka topic from where pinot consumes .. i should make a query on pinot broker for existing value and then do the addition
but it will cause too many db calls on pinot and samza processing throughput may get slow since events maybe at very high frequency same lacs per sec . .then it wont be feasible to make a network http restful call
x

Xiang Fu

06/09/2021, 9:04 AM
or you just do aggregation on samza job
you can have a kv store to keep the local status
s

Sadim Nadeem

06/09/2021, 9:04 AM
ok then i have to hold the entire state of my jobs in samza inbuilt state management database like leveldb or rocksdb or KV store
ok got it @Xiang Fu thanks for the clarification . very helpful .. will come back if anymore doubt on this
x

Xiang Fu

06/09/2021, 9:06 AM
yeah, partition + rocksdb should work
👍 1
s

Sadim Nadeem

06/09/2021, 9:10 AM
sure will try that