https://pinot.apache.org/ logo
l

lâm nguyễn hoàng

12/08/2020, 5:46 PM
please help me
x

Xiang Fu

12/08/2020, 5:47 PM
@Jackie do we have any specific memory requirements for upsert case?
@Yupeng Fu ^^
we tried to turn this on, but it doesn't work as well:
Copy code
pinot.server.instance.realtime.alloc.offheap=true
pinot.server.instance.realtime.alloc.offheap.direct=false/true
y

Yupeng Fu

12/08/2020, 6:13 PM
not sure if this is related to upsert
only upsert metadata is on heap
the rest is the same as normal segments
it would be helpful to use debug endpoint to display the memory use
x

Xiang Fu

12/08/2020, 6:15 PM
oh ? what's this endpoint
y

Yupeng Fu

12/08/2020, 6:22 PM
MmapDebugResource
x

Xiang Fu

12/08/2020, 6:23 PM
@lâm nguyễn hoàng can you try this
l

lâm nguyễn hoàng

12/08/2020, 6:24 PM
How can I run this "MmapDebugResource"
x

Xiang Fu

12/08/2020, 6:24 PM
should be on server admin port
y

Yupeng Fu

12/08/2020, 6:25 PM
debug/memory/offheap/table/{tableName}
l

lâm nguyễn hoàng

12/08/2020, 6:27 PM
message has been deleted
@Yupeng Fu
j

Jackie

12/08/2020, 6:32 PM
For upsert, there is a concurrent map storing the mapping from primary key to record location, which is on heap
If the cardinality of the primary key is not extremely high, it should be fine
@lâm nguyễn hoàng Is this the server rest port?
l

lâm nguyễn hoàng

12/08/2020, 6:34 PM
yes
message has been deleted
@Jackie help me
j

Jackie

12/08/2020, 6:42 PM
This seems like the controller rest port
y

Yupeng Fu

12/08/2020, 6:42 PM
that shows your controller
run it on server
l

lâm nguyễn hoàng

12/08/2020, 6:49 PM
message has been deleted
message has been deleted
Can't show @Yupeng Fu
message has been deleted
y

Yupeng Fu

12/08/2020, 6:50 PM
8000 is netty
not rest
j

Jackie

12/08/2020, 6:52 PM
8030 is the rest port per the config
l

lâm nguyễn hoàng

12/08/2020, 6:54 PM
message has been deleted
not found @Jackie
j

Jackie

12/08/2020, 6:59 PM
Is the host a pinot server?
l

lâm nguyễn hoàng

12/08/2020, 7:01 PM
yes
message has been deleted
debug/memory/offheap/table/{tableName} path is corr ?ect
@Jackie
j

Jackie

12/08/2020, 7:03 PM
The path is correct
Can you also try
/debug/memory/offheap
?
l

lâm nguyễn hoàng

12/08/2020, 7:04 PM
message has been deleted
j

Jackie

12/08/2020, 7:06 PM
Can you also share the schema of the table?
l

lâm nguyễn hoàng

12/08/2020, 7:07 PM
message has been deleted
{ "schemaName": "bhx_bhx_forecast_forecast_item", "dimensionFieldSpecs": [ { "name": "forecastpurchase", "dataType": "DOUBLE" }, { "name": "createduser", "dataType": "STRING" }, { "name": "inputquantity", "dataType": "DOUBLE" }, { "name": "forecast", "dataType": "DOUBLE" }, { "name": "forecastnopromotion", "dataType": "DOUBLE" }, { "name": "storeid", "dataType": "LONG" }, { "name": "storequantity", "dataType": "DOUBLE" }, { "name": "isdeleted", "dataType": "INT" }, { "name": "deleteduser", "dataType": "STRING" }, { "name": "date_key", "dataType": "LONG" }, { "name": "itemid", "dataType": "STRING" }, { "name": "sellquantity", "dataType": "DOUBLE" }, { "name": "forecast15", "dataType": "DOUBLE" }, { "name": "forecast8", "dataType": "DOUBLE" }, { "name": "forecastnopromo", "dataType": "DOUBLE" }, { "name": "branchquantity", "dataType": "DOUBLE" }, { "name": "updateduser", "dataType": "STRING" }, { "name": "_DELETED", "dataType": "INT" } ], "dateTimeFieldSpecs": [ { "name": "createddate", "dataType": "LONG", "format": "1MILLISECONDSEPOCH", "granularity": "1:MILLISECONDS" }, { "name": "deleteddate", "dataType": "LONG", "format": "1MILLISECONDSEPOCH", "granularity": "1:MILLISECONDS" }, { "name": "updateddate", "dataType": "LONG", "format": "1MILLISECONDSEPOCH", "granularity": "1:MILLISECONDS" }, { "name": "_TIMESTAMP", "dataType": "LONG", "format": "1MILLISECONDSEPOCH", "granularity": "1:MILLISECONDS" } ], "primaryKeyColumns": [ "itemid", "storeid", "date_key" ] }
{ "REALTIME": { "tableName": "bhx_bhx_forecast_forecast_item_REALTIME", "tableType": "REALTIME", "segmentsConfig": { "timeType": "MILLISECONDS", "retentionTimeUnit": "DAYS", "retentionTimeValue": "9125", "segmentPushFrequency": "DAILY", "segmentPushType": "APPEND", "replication": "4", "replicasPerPartition": "4", "timeColumnName": "_TIMESTAMP", "schemaName": "bhx_bhx_forecast_forecast_item" }, "tenants": { "broker": "DefaultTenant", "server": "DefaultTenant", "tagOverrideConfig": {} }, "tableIndexConfig": { "streamConfigs": { "streamType": "kafka", "stream.kafka.consumer.type": "lowlevel", "stream.kafka.topic.name": "PINOT.BHX.bhx_forecast.forecast_item", "stream.kafka.table.tablename": "bhx_forecast.forecast_item", "stream.kafka.table.part.pattern": "_[0-9]+", "stream.kafka.cdc.format": "CDC", "stream.kafka.decoder.class.name": "com.mwg.pinot.realtime.KafkaCDCMessageDecoder", "stream.kafka.consumer.factory.class.name": "com.mwg.pinot.realtime.KafkaCDCConsumerFactory", "stream.kafka.broker.list": "datastore-broker01-kafka-ovm-6-769092,datastore broker02 kafka ovm 6 779093,datastore-broker03-kafka-ovm-6-78:9094", "stream.kafka.consumer.prop.auto.offset.reset": "smallest", "realtime.segment.flush.threshold.rows": "0", "realtime.segment.flush.threshold.time": "60m", "realtime.segment.flush.threshold.segment.size": "500M", "group.id": "bhx_bhx_forecast.forecast_item-PINOT_INGESTION", "max.partition.fetch.bytes": "167772160", "receive.buffer.bytes": "67108864", "isolation.level": "read_committed", "max.poll.records": "5000" }, "noDictionaryColumns": [], "onHeapDictionaryColumns": [], "varLengthDictionaryColumns": [], "enableDefaultStarTree": false, "starTreeIndexConfigs": [], "enableDynamicStarTreeCreation": false, "aggregateMetrics": false, "nullHandlingEnabled": false, "autoGeneratedInvertedIndex": false, "createInvertedIndexDuringSegmentGeneration": false, "sortedColumn": [], "bloomFilterColumns": [], "loadMode": "MMAP", "rangeIndexColumns": [] }, "metadata": { "customConfigs": {} }, "routing": { "instanceSelectorType": "strictReplicaGroup" }, "instanceAssignmentConfigMap": { "CONSUMING": { "tagPoolConfig": { "tag": "inventory_REALTIME", "poolBased": false, "numPools": 0 }, "replicaGroupPartitionConfig": { "replicaGroupBased": true, "numInstances": 0, "numReplicaGroups": 4, "numInstancesPerReplicaGroup": 5, "numPartitions": 0, "numInstancesPerPartition": 0 } } }, "upsertConfig": { "mode": "FULL" } } }
@Jackie schema and table config above
j

Jackie

12/08/2020, 7:13 PM
I think the issue is that the primary key (
itemid, storid, date_key
) is almost always unique, which will make the key map very big
What's the purpose of enabling upsert for this table?
l

lâm nguyễn hoàng

12/08/2020, 7:15 PM
This table of data is recalculated daily
j

Jackie

12/08/2020, 7:18 PM
I don't follow
Do you need to replace the data every day, or just append the data for new day?
@Yupeng Fu @lâm nguyễn hoàng This rest endpoint
debug/memory/offheap/table/{tableName}
is added recently (https://github.com/apache/incubator-pinot/pull/6172), and not included in the latest release
x

Xiang Fu

12/08/2020, 7:24 PM
I think all the recomputed data are also pushed to kafka
hence the upsert
y

Yupeng Fu

12/08/2020, 7:29 PM
oh, i see. then use
memory/offheap
i think
j

Jackie

12/08/2020, 7:29 PM
The cardinality of the primary key is unbounded, which will make the upsert metadata map size unbounded
y

Yupeng Fu

12/08/2020, 7:30 PM
it’s bounded by the msgs consumed?
j

Jackie

12/08/2020, 7:31 PM
Copy code
"retentionTimeUnit": "DAYS",
      "retentionTimeValue": "9125",
About 25 years data lol
If we want to re-compute the records for the previous day to fix the data every day, we should use the hybrid table approach, which is designed for this
l

lâm nguyễn hoàng

12/08/2020, 7:41 PM
ok tks everybody @Xiang Fu @Yupeng Fu @Jackie
got it
a

Abhijeet Kushe

05/23/2023, 6:28 PM
@lâm nguyễn hoàng We are also using realtime table in upsert mode with Kinesis.You have mentioned you are 450 million records.Is that total volume ?