please help me
# troubleshooting
l
please help me
x
@Jackie do we have any specific memory requirements for upsert case?
@Yupeng Fu ^^
we tried to turn this on, but it doesn't work as well:
Copy code
pinot.server.instance.realtime.alloc.offheap=true
pinot.server.instance.realtime.alloc.offheap.direct=false/true
y
not sure if this is related to upsert
only upsert metadata is on heap
the rest is the same as normal segments
it would be helpful to use debug endpoint to display the memory use
x
oh ? what's this endpoint
y
MmapDebugResource
x
@lâm nguyễn hoàng can you try this
l
How can I run this "MmapDebugResource"
x
should be on server admin port
y
debug/memory/offheap/table/{tableName}
l
Screen Shot 2020-12-09 at 01.27.14.png
@Yupeng Fu
j
For upsert, there is a concurrent map storing the mapping from primary key to record location, which is on heap
If the cardinality of the primary key is not extremely high, it should be fine
@lâm nguyễn hoàng Is this the server rest port?
l
yes
Screen Shot 2020-12-09 at 01.35.05.png
@Jackie help me
j
This seems like the controller rest port
y
that shows your controller
run it on server
l
Screen Shot 2020-12-09 at 01.48.38.png
Screen Shot 2020-12-09 at 01.49.20.png
Can't show @Yupeng Fu
Screen Shot 2020-12-09 at 01.50.27.png
y
8000 is netty
not rest
j
8030 is the rest port per the config
l
Screen Shot 2020-12-09 at 01.54.25.png
not found @Jackie
j
Is the host a pinot server?
l
yes
Screen Shot 2020-12-09 at 02.01.29.png
debug/memory/offheap/table/{tableName} path is corr ?ect
@Jackie
j
The path is correct
Can you also try
/debug/memory/offheap
?
l
Screen Shot 2020-12-09 at 02.04.34.png
j
Can you also share the schema of the table?
l
Screen Shot 2020-12-09 at 02.07.11.png
{ "schemaName": "bhx_bhx_forecast_forecast_item", "dimensionFieldSpecs": [ { "name": "forecastpurchase", "dataType": "DOUBLE" }, { "name": "createduser", "dataType": "STRING" }, { "name": "inputquantity", "dataType": "DOUBLE" }, { "name": "forecast", "dataType": "DOUBLE" }, { "name": "forecastnopromotion", "dataType": "DOUBLE" }, { "name": "storeid", "dataType": "LONG" }, { "name": "storequantity", "dataType": "DOUBLE" }, { "name": "isdeleted", "dataType": "INT" }, { "name": "deleteduser", "dataType": "STRING" }, { "name": "date_key", "dataType": "LONG" }, { "name": "itemid", "dataType": "STRING" }, { "name": "sellquantity", "dataType": "DOUBLE" }, { "name": "forecast15", "dataType": "DOUBLE" }, { "name": "forecast8", "dataType": "DOUBLE" }, { "name": "forecastnopromo", "dataType": "DOUBLE" }, { "name": "branchquantity", "dataType": "DOUBLE" }, { "name": "updateduser", "dataType": "STRING" }, { "name": "_DELETED", "dataType": "INT" } ], "dateTimeFieldSpecs": [ { "name": "createddate", "dataType": "LONG", "format": "1MILLISECONDSEPOCH", "granularity": "1:MILLISECONDS" }, { "name": "deleteddate", "dataType": "LONG", "format": "1MILLISECONDSEPOCH", "granularity": "1:MILLISECONDS" }, { "name": "updateddate", "dataType": "LONG", "format": "1MILLISECONDSEPOCH", "granularity": "1:MILLISECONDS" }, { "name": "_TIMESTAMP", "dataType": "LONG", "format": "1MILLISECONDSEPOCH", "granularity": "1:MILLISECONDS" } ], "primaryKeyColumns": [ "itemid", "storeid", "date_key" ] }
{ "REALTIME": { "tableName": "bhx_bhx_forecast_forecast_item_REALTIME", "tableType": "REALTIME", "segmentsConfig": { "timeType": "MILLISECONDS", "retentionTimeUnit": "DAYS", "retentionTimeValue": "9125", "segmentPushFrequency": "DAILY", "segmentPushType": "APPEND", "replication": "4", "replicasPerPartition": "4", "timeColumnName": "_TIMESTAMP", "schemaName": "bhx_bhx_forecast_forecast_item" }, "tenants": { "broker": "DefaultTenant", "server": "DefaultTenant", "tagOverrideConfig": {} }, "tableIndexConfig": { "streamConfigs": { "streamType": "kafka", "stream.kafka.consumer.type": "lowlevel", "stream.kafka.topic.name": "PINOT.BHX.bhx_forecast.forecast_item", "stream.kafka.table.tablename": "bhx_forecast.forecast_item", "stream.kafka.table.part.pattern": "_[0-9]+", "stream.kafka.cdc.format": "CDC", "stream.kafka.decoder.class.name": "com.mwg.pinot.realtime.KafkaCDCMessageDecoder", "stream.kafka.consumer.factory.class.name": "com.mwg.pinot.realtime.KafkaCDCConsumerFactory", "stream.kafka.broker.list": "datastore-broker01-kafka-ovm-6-769092,datastore broker02 kafka ovm 6 779093,datastore-broker03-kafka-ovm-6-78:9094", "stream.kafka.consumer.prop.auto.offset.reset": "smallest", "realtime.segment.flush.threshold.rows": "0", "realtime.segment.flush.threshold.time": "60m", "realtime.segment.flush.threshold.segment.size": "500M", "group.id": "bhx_bhx_forecast.forecast_item-PINOT_INGESTION", "max.partition.fetch.bytes": "167772160", "receive.buffer.bytes": "67108864", "isolation.level": "read_committed", "max.poll.records": "5000" }, "noDictionaryColumns": [], "onHeapDictionaryColumns": [], "varLengthDictionaryColumns": [], "enableDefaultStarTree": false, "starTreeIndexConfigs": [], "enableDynamicStarTreeCreation": false, "aggregateMetrics": false, "nullHandlingEnabled": false, "autoGeneratedInvertedIndex": false, "createInvertedIndexDuringSegmentGeneration": false, "sortedColumn": [], "bloomFilterColumns": [], "loadMode": "MMAP", "rangeIndexColumns": [] }, "metadata": { "customConfigs": {} }, "routing": { "instanceSelectorType": "strictReplicaGroup" }, "instanceAssignmentConfigMap": { "CONSUMING": { "tagPoolConfig": { "tag": "inventory_REALTIME", "poolBased": false, "numPools": 0 }, "replicaGroupPartitionConfig": { "replicaGroupBased": true, "numInstances": 0, "numReplicaGroups": 4, "numInstancesPerReplicaGroup": 5, "numPartitions": 0, "numInstancesPerPartition": 0 } } }, "upsertConfig": { "mode": "FULL" } } }
@Jackie schema and table config above
j
I think the issue is that the primary key (
itemid, storid, date_key
) is almost always unique, which will make the key map very big
What's the purpose of enabling upsert for this table?
l
This table of data is recalculated daily
j
I don't follow
Do you need to replace the data every day, or just append the data for new day?
@Yupeng Fu @lâm nguyễn hoàng This rest endpoint
debug/memory/offheap/table/{tableName}
is added recently (https://github.com/apache/incubator-pinot/pull/6172), and not included in the latest release
x
I think all the recomputed data are also pushed to kafka
hence the upsert
y
oh, i see. then use
memory/offheap
i think
j
The cardinality of the primary key is unbounded, which will make the upsert metadata map size unbounded
y
it’s bounded by the msgs consumed?
j
Copy code
"retentionTimeUnit": "DAYS",
      "retentionTimeValue": "9125",
About 25 years data lol
If we want to re-compute the records for the previous day to fix the data every day, we should use the hybrid table approach, which is designed for this
l
ok tks everybody @Xiang Fu @Yupeng Fu @Jackie
got it
a
@lâm nguyễn hoàng We are also using realtime table in upsert mode with Kinesis.You have mentioned you are 450 million records.Is that total volume ?