Jai Patel
02/23/2021, 6:04 PMselect * from enriched_customer_orders_jp_upsert_realtime_streaming_v1
where normalized_order_id='62:1221247' and ofo_slug='fofo' and store_id='73f6975b-07e8-407a-97a1-580043094a68'
limit 10
Table Spec:
{
"REALTIME": {
"tableName": "enriched_customer_orders_jp_upsert_realtime_streaming_v1_REALTIME",
"tableType": "REALTIME",
"segmentsConfig": {
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
"timeColumnName": "updated_at_seconds",
"retentionTimeUnit": "DAYS",
"retentionTimeValue": "30",
"segmentPushType": "APPEND",
"replicasPerPartition": "3",
"schemaName": "enriched_customer_orders_jp_upsert_realtime_streaming_v1"
},
"tenants": {
"broker": "DefaultTenant",
"server": "DefaultTenant"
},
"tableIndexConfig": {
"createInvertedIndexDuringSegmentGeneration": true,
"bloomFilterColumns": [
"Filter1",
"Filter2"
],
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "LowLevel",
"stream.kafka.topic.name": "topic-topic-topic-topic-topic",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.broker.list": "kafka-host:9092",
"realtime.segment.flush.threshold.size": "1000",
"realtime.segment.flush.threshold.rows": "1000",
"realtime.segment.flush.threshold.time": "6h",
"realtime.segment.flush.desired.size": "200M",
"isolation.level": "read_committed",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.consumer.prop.group.id": "enriched_customer_orders_jp_upsert_realtime_streaming_v1_8F6C7BAF-EEA7-441F-ABE3-50BF5F2C4F0A",
"stream.kafka.consumer.prop.client.id": "v1_732F3C29-4CDA-45AA-85F1-740A0176C6A5",
"stream.kafka.decoder.prop.schema.registry.rest.url": "<http://confluent-host:8081>"
},
"enableDefaultStarTree": false,
"enableDynamicStarTreeCreation": false,
"aggregateMetrics": true,
"nullHandlingEnabled": false,
"invertedIndexColumns": [
"store_id"
],
"autoGeneratedInvertedIndex": false
},
"metadata": {},
"routing": {
"instanceSelectorType": "strictReplicaGroup"
},
"upsertConfig": {
"mode": "FULL"
}
}
}
Simplification of our schema. There are a lot of other columns. But trimmed to something that would fit (kept all keys).
{
"schemaName": "enriched_customer_orders_jp_upsert_realtime_streaming_v1",
"dimensionFieldSpecs": [
{
"name": "store_id",
"dataType": "STRING"
},
{
"name": "updated_at",
"dataType": "LONG",
"defaultNullValue": 0
},
{
"name": "normalized_order_id",
"dataType": "STRING"
},
{
"name": "ofo_slug",
"dataType": "STRING"
}
],
"metricFieldSpecs": [
{
"name": "usd_exchange_rate",
"dataType": "DOUBLE"
},
{
"name": "total",
"dataType": "DOUBLE"
}
],
"dateTimeFieldSpecs": [
{
"name": "updated_at_seconds",
"dataType": "LONG",
"defaultNullValue": 0,
"transformFunction": "toEpochSeconds(updated_at)",
"format": "1:MILLISECONDS:EPOCH",
"granularity": "1:SECONDS"
}
],
"primaryKeyColumns": [
"ofo_slug",
"store_id",
"normalized_order_id"
]
}
Our kafka key is:
store_id::ofo_slug::normalized_order_id
as a concatenation.Kishore G
Yupeng Fu
02/23/2021, 6:22 PM$segmentName
to your select clause, and check the values for the duplicate recordsJai Patel
02/23/2021, 6:25 PMenriched_customer_orders_jp_upsert_realtime_streaming_v1__10__57__20210220T2243Z
enriched_customer_orders_jp_upsert_realtime_streaming_v1__10__61__20210221T0000Z
enriched_customer_orders_jp_upsert_realtime_streaming_v1__10__1__20210220T0807Z
enriched_customer_orders_jp_upsert_realtime_streaming_v1__10__70__20210221T0315Z
Jai Patel
02/23/2021, 6:26 PMYupeng Fu
02/23/2021, 6:34 PMYupeng Fu
02/23/2021, 6:35 PMYupeng Fu
02/23/2021, 6:36 PMJai Patel
02/23/2021, 6:40 PMYupeng Fu
02/23/2021, 6:42 PMfrom enriched_customer_orders_jp_upsert_realtime_streaming_v1 option( skipUpsert=true)
Yupeng Fu
02/23/2021, 6:42 PMJai Patel
02/23/2021, 6:46 PMJai Patel
02/23/2021, 6:46 PMJai Patel
02/23/2021, 6:46 PMYupeng Fu
02/23/2021, 6:59 PMYupeng Fu
02/23/2021, 7:00 PMdisableUpsert
was the previous deprecated nameYupeng Fu
02/23/2021, 7:00 PMYupeng Fu
02/23/2021, 7:00 PMYupeng Fu
02/23/2021, 7:01 PMJai Patel
02/23/2021, 7:01 PMJai Patel
02/23/2021, 7:02 PMJai Patel
02/23/2021, 7:02 PMYupeng Fu
02/23/2021, 7:09 PMYupeng Fu
02/23/2021, 7:09 PMJai Patel
02/23/2021, 7:10 PMJai Patel
02/23/2021, 7:10 PMYupeng Fu
02/23/2021, 7:10 PMJai Patel
02/23/2021, 7:12 PMJackie
02/23/2021, 7:13 PMJai Patel
02/23/2021, 7:16 PMJackie
02/23/2021, 7:17 PMJackie
02/23/2021, 7:17 PMJai Patel
02/23/2021, 7:25 PMJai Patel
02/23/2021, 7:26 PMJackie
02/23/2021, 7:26 PM"numDocsScanned": 1,
Jackie
02/23/2021, 7:26 PMJai Patel
02/23/2021, 7:27 PMJai Patel
02/23/2021, 7:27 PMJai Patel
02/23/2021, 7:28 PM"exceptions": [],
"numServersQueried": 1,
"numServersResponded": 1,
"numSegmentsQueried": 3079,
"numSegmentsProcessed": 2796,
"numSegmentsMatched": 4,
"numConsumingSegmentsQueried": 12,
"numDocsScanned": 4,
"numEntriesScannedInFilter": 315,
"numEntriesScannedPostFilter": 448,
"numGroupsLimitReached": false,
"totalDocs": 254640,
"timeUsedMs": 719,
"segmentStatistics": [],
Jackie
02/23/2021, 7:30 PMJackie
02/23/2021, 7:30 PMJai Patel
02/23/2021, 7:31 PMJai Patel
02/23/2021, 7:31 PMJai Patel
02/23/2021, 7:32 PMJai Patel
02/23/2021, 7:32 PMJackie
02/23/2021, 7:34 PMKishore G
Jai Patel
02/23/2021, 7:36 PMJackie
02/23/2021, 7:40 PMJackie
02/23/2021, 7:40 PMKishore G
Kishore G
"ofo_slug",
"store_id",
"normalized_order_id"
Jai Patel
02/23/2021, 7:44 PMJai Patel
02/23/2021, 7:45 PMKishore G
Jai Patel
02/23/2021, 7:45 PMJai Patel
02/23/2021, 7:46 PMJai Patel
02/23/2021, 7:46 PMJackie
02/23/2021, 8:12 PM"aggregateMetrics": true,
Jackie
02/23/2021, 8:20 PMaggregateMetrics
cannot be configured together with upsert
. Can you please check the server log and see if it encounters exceptions when ingesting the records?Jackie
02/23/2021, 9:39 PMJackie
02/23/2021, 9:39 PMJai Patel
02/23/2021, 9:39 PMJai Patel
02/23/2021, 9:40 PMJackie
02/23/2021, 9:41 PMJackie
02/23/2021, 9:41 PMJackie
02/23/2021, 9:44 PMJai Patel
02/23/2021, 10:25 PMJai Patel
02/23/2021, 10:25 PMJai Patel
02/23/2021, 10:26 PMJai Patel
02/23/2021, 10:42 PMElon
02/24/2021, 3:40 AM