I’m having some trouble with upserts where a query...
# troubleshooting
j
I’m having some trouble with upserts where a query through the Pinot UI will sometimes return the latest row, sometimes it’ll return all rows. Query:
Copy code
select * from enriched_customer_orders_jp_upsert_realtime_streaming_v1
where normalized_order_id='62:1221247' and ofo_slug='fofo' and store_id='73f6975b-07e8-407a-97a1-580043094a68'
limit 10
Table Spec:
Copy code
{
  "REALTIME": {
    "tableName": "enriched_customer_orders_jp_upsert_realtime_streaming_v1_REALTIME",
    "tableType": "REALTIME",
    "segmentsConfig": {
      "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
      "timeColumnName": "updated_at_seconds",
      "retentionTimeUnit": "DAYS",
      "retentionTimeValue": "30",
      "segmentPushType": "APPEND",
      "replicasPerPartition": "3",
      "schemaName": "enriched_customer_orders_jp_upsert_realtime_streaming_v1"
    },
    "tenants": {
      "broker": "DefaultTenant",
      "server": "DefaultTenant"
    },
    "tableIndexConfig": {
      "createInvertedIndexDuringSegmentGeneration": true,
      "bloomFilterColumns": [
        "Filter1",
        "Filter2"
      ],
      "loadMode": "MMAP",
      "streamConfigs": {
        "streamType": "kafka",
        "stream.kafka.consumer.type": "LowLevel",
        "stream.kafka.topic.name": "topic-topic-topic-topic-topic",
        "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
        "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
        "stream.kafka.broker.list": "kafka-host:9092",
        "realtime.segment.flush.threshold.size": "1000",
        "realtime.segment.flush.threshold.rows": "1000",
        "realtime.segment.flush.threshold.time": "6h",
        "realtime.segment.flush.desired.size": "200M",
        "isolation.level": "read_committed",
        "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
        "stream.kafka.consumer.prop.group.id": "enriched_customer_orders_jp_upsert_realtime_streaming_v1_8F6C7BAF-EEA7-441F-ABE3-50BF5F2C4F0A",
        "stream.kafka.consumer.prop.client.id": "v1_732F3C29-4CDA-45AA-85F1-740A0176C6A5",
        "stream.kafka.decoder.prop.schema.registry.rest.url": "<http://confluent-host:8081>"
      },
      "enableDefaultStarTree": false,
      "enableDynamicStarTreeCreation": false,
      "aggregateMetrics": true,
      "nullHandlingEnabled": false,
      "invertedIndexColumns": [
        "store_id"
      ],
      "autoGeneratedInvertedIndex": false
    },
    "metadata": {},
    "routing": {
      "instanceSelectorType": "strictReplicaGroup"
    },
    "upsertConfig": {
      "mode": "FULL"
    }
  }
}
Simplification of our schema. There are a lot of other columns. But trimmed to something that would fit (kept all keys).
Copy code
{
  "schemaName": "enriched_customer_orders_jp_upsert_realtime_streaming_v1",
  "dimensionFieldSpecs": [
    {
      "name": "store_id",
      "dataType": "STRING"
    },
    {
      "name": "updated_at",
      "dataType": "LONG",
      "defaultNullValue": 0
    },
    {
      "name": "normalized_order_id",
      "dataType": "STRING"
    },
    {
      "name": "ofo_slug",
      "dataType": "STRING"
    }
  ],
  "metricFieldSpecs": [
    {
      "name": "usd_exchange_rate",
      "dataType": "DOUBLE"
    },
    {
      "name": "total",
      "dataType": "DOUBLE"
    }
  ],
  "dateTimeFieldSpecs": [
    {
      "name": "updated_at_seconds",
      "dataType": "LONG",
      "defaultNullValue": 0,
      "transformFunction": "toEpochSeconds(updated_at)",
      "format": "1:MILLISECONDS:EPOCH",
      "granularity": "1:SECONDS"
    }
  ],
  "primaryKeyColumns": [
    "ofo_slug",
    "store_id",
    "normalized_order_id"
  ]
}
Our kafka key is:
store_id::ofo_slug::normalized_order_id
as a concatenation.
🙏 1
k
@Jackie @Yupeng Fu ^^
y
add
$segmentName
to your select clause, and check the values for the duplicate records
1
j
Copy code
enriched_customer_orders_jp_upsert_realtime_streaming_v1__10__57__20210220T2243Z
enriched_customer_orders_jp_upsert_realtime_streaming_v1__10__61__20210221T0000Z
enriched_customer_orders_jp_upsert_realtime_streaming_v1__10__1__20210220T0807Z
enriched_customer_orders_jp_upsert_realtime_streaming_v1__10__70__20210221T0315Z
I’m querying based on the 3 key columns and so it seems to be matching.
y
can you check what are the servers of the segments?
you can go to UI for this
btw, are you using master branch?
j
All 4 segments are on the same Replica Set: • Server_pinot-us-central1-server-0.pinot-us-central1-server-headless.pinot.svc.cluster.local_8098 • Server_pinot-us-central1-server-1.pinot-us-central1-server-headless.pinot.svc.cluster.local_8098 • Server_pinot-us-central1-server-2.pinot-us-central1-server-headless.pinot.svc.cluster.local_8098
y
k. and if you use
from enriched_customer_orders_jp_upsert_realtime_streaming_v1 option( skipUpsert=true)
do you see difference?
j
Yeah. With skipUpsert I saw all 4 copies 100% of the time over 20 successive calls. Without it I saw 4 copies about 50% of the time over 20 calls.
Is it skipUpsert or disableUpsert?
skipUpsert causes consistently all 4 rows. disableUpsert doesn’t seem to work (I still see 50% consistency)
y
skipUpsert
disableUpsert
was the previous deprecated name
i need to fix the doc
hmm 50% of 4 and 50% of 1?
how many brokers do you have
j
I ran with skipUpsert 20 times. Of those 20 calls all of them return 4 rows.
I ran without skipUpsert 20 times. Of those 20 calls, 11 returned one row, 9 returned 4 rows. So I seem to be getting inconsistent deduplication with the table.
We have 2 tenants, 3 controllers, 3 brokers, 6 servers
y
hmm this 50% rate is suspicious, i feel it’s some inconsistent configs of broker/server
or it’s either 1 or 4 copies?
j
I get either 1 row back or 4 rows back. When I get 4 rows back the rows share the same key set, but the rest of the rows are different.
When I get one row back it is the one with the highest time column value
y
@Jackie any thoughts?
j
Is there a way to target the query at a specific broker in the UI?
j
When you get 4 rows, can you check the number of servers queried within the response metadata?
j
Can I get the response metadata in the UI?
j
Yes, you should be able to see the response metadata through the query console
Or choose to show the raw response json
j
“exceptions”: [],  “numServersQueried”: 1,  “numServersResponded”: 1,  “numSegmentsQueried”: 3072,  “numSegmentsProcessed”: 2786,  “numSegmentsMatched”: 1,  “numConsumingSegmentsQueried”: 12,  “numDocsScanned”: 1,  “numEntriesScannedInFilter”: 315,  “numEntriesScannedPostFilter”: 112,  “numGroupsLimitReached”: false,  “totalDocs”: 254020,
(used json output)
j
"numDocsScanned": 1,
There is only 1 row returned right?
j
this time, yeah… let me see what it looks like when i get 4
(noticed that too)
When I see 4 rows returned:
Copy code
"exceptions": [],
  "numServersQueried": 1,
  "numServersResponded": 1,
  "numSegmentsQueried": 3079,
  "numSegmentsProcessed": 2796,
  "numSegmentsMatched": 4,
  "numConsumingSegmentsQueried": 12,
  "numDocsScanned": 4,
  "numEntriesScannedInFilter": 315,
  "numEntriesScannedPostFilter": 448,
  "numGroupsLimitReached": false,
  "totalDocs": 254640,
  "timeUsedMs": 719,
  "segmentStatistics": [],
j
So only 1 server is queried, but the records are not dedupped correctly
Did you enable the upsert in table config on the fly (by modifying the table config), or you directly created the table with upsert enabled?
j
directly created using swagger
realtime, only (as specced), started streaming content from our topic.
We changed our kafka key to match the pinot keyset and truncated the old messages off the topic. Confirmed that by catting the first kafka message from the topic and checking the key.
So the kafka key matches what was in the original message.
j
Hmm.. Can you try restarting the servers and see if it fixes the issue?
k
lets try to find out the root cause before restarting
j
This has been a bit consistent. I’ve created 2 or 3 tables in very similar ways and I’ve consistently seen this behavior.
j
All the tables are created with upsert enabled, and never updated?
Which version of Pinot are you running?
k
@Jai Patel what is the kafka topic partitioned on
Copy code
"ofo_slug",
    "store_id",
    "normalized_order_id"
j
store_id:ofo slug:normalized_order_id << fixed sequence
we concatenate the strings separated by double colons
k
ok
j
and all the rows returned have identical values for those 3 columns and none are null.
CC: @Elon
we’re using 0.6.0
j
This could cause problem:
Copy code
"aggregateMetrics": true,
Basically
aggregateMetrics
cannot be configured together with
upsert
. Can you please check the server log and see if it encounters exceptions when ingesting the records?
👀 2
Hmm, actually it will be turned off automatically because the metric fields are dictionary encoded
@Jai Patel Do you have some time for a zoom call to further debug the issue?
j
Errors are falling into buckets: 1. Schema name does not match raw table name 2. Please reduce the rate of create, update, and delete requests 3. Could not move segment 4. Failed to move segment 5. Failed to find local segment file for segment 6. already exists. Replace it with segment: There were some GCS issues. But those look unrelated.
@Jackie yes i have time. But should I try recreating the table without aggregateMetrics on to see if it repros under that condition?
j
Let's check some states first
@Jai Patel
j
Thanks Jackie for the call.
@Kishore G atm, the best theory is that I created a upsert table on top of a partially deleted non-upsert table (there were a number of errors in our logs about the delete during the GCS operations)
if restarting the server resolves the issue it would lend to that theory.
restarting the server resolved the issue
👍 2
e
Hey everyone, apologies for the late reply. My power was out the entire day. Just catching up on messages. Thanks for the help with this!