Hi team, I’m trying to use Pinot upsert feature. P...
# troubleshooting
a
Hi team, I’m trying to use Pinot upsert feature. Part of my table config is like below: { “tableName”: “upsert_test_local”, “tableType”: “REALTIME”, “segmentsConfig”: { “schemaName”: “upsert_test_local”, “timeColumnName”: “*created_on*”, “timeType”: “MILLISECONDS”, “allowNullTimeValue”: true, “replicasPerPartition”: “1", “retentionTimeUnit”: “DAYS”, “retentionTimeValue”: “30", “segmentPushType”: “APPEND”, “completionConfig”: { “completionMode”: “DOWNLOAD” } }, “tenants”: { }, “tableIndexConfig”: { “loadMode”: “MMAP”, “aggregateMetrics”: true, “nullHandlingEnabled”: true, “streamConfigs”: { “streamType”: “kafka”, “stream.kafka.consumer.type”: “lowlevel”, “stream.kafka.decoder.class.name”: “org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder”, “stream.kafka.consumer.factory.class.name”: “org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory”, “stream.kafka.consumer.prop.auto.offset.reset”: “largest”, “realtime.segment.flush.threshold.time”: “*30m*”, “realtime.segment.flush.threshold.rows”: “0", “realtime.segment.flush.threshold.segment.size”: “100M”, “realtime.segment.flush.autotune.initialRows”: “1000000" } }, “ingestionConfig”: { “filterConfig”: { “filterFunction”: “Groovy({tablename != \“test_table_name\“}, tablename)” }, “transformConfigs”: [ { “columnName”: “id”, “transformFunction”: “Groovy({UUID.randomUUID().toString()}, tablename)” }, { “columnName”: “*timestamp*”, “transformFunction”: “jsonPathString(metrics, ‘$.timestamp’)” }, { “columnName”: “*created_on*”, “transformFunction”: “Groovy({System.currentTimeMillis()}, tablename)” }, { “columnName”: “updated_on”, “transformFunction”: “Groovy({System.currentTimeMillis()}, tablename)” } ] }, “metadata”: { “customConfigs”: {} }, “routing”: { “instanceSelectorType”: “strictReplicaGroup” }, “upsertConfig”: { “mode”: “PARTIAL”, “defaultPartialUpsertStrategy”: “OVERWRITE”, “partialUpsertStrategies”:{ “*created_on*”: “IGNORE” } } } And part of schema is like this: “dateTimeFieldSpecs”: [ { “name”: “*timestamp*”, “dataType”: “LONG”, “format”: “1MILLISECONDSEPOCH”, “granularity”: “1:MILLISECONDS” }, { “name”: “*created_on*”, “dataType”: “LONG”, “format”: “1MILLISECONDSEPOCH”, “granularity”: “1:MILLISECONDS” }, { “name”: “updated_on”, “dataType”: “LONG”, “format”: “1MILLISECONDSEPOCH”, “granularity”: “1:MILLISECONDS” } ], “primaryKeyColumns”: [ “timestamp” ] At first, upsert works as expected. But after a while, like 30 minutes later, when I query this table, there is no record in this table. But totalDocs in the query response stats is not 0. Then I write some data to the same Kafka topic, and query this table, there are some records. But the value of the created_on field is 0 instead of the current timestamp. Any idea what property is not set right here? Is it timeColumnName property?
k
Hi, I see you are using timestamp column as primaryKey, is that expected? Ideally for upsert to work your primaryKey should be unique for a record and your input kafka stream should be partitioned on the primary key.
@Jackie
a
Hi, I’ve replicated this error. The records are there(use skipupsert option). But can’t be queried.
k
Hi, when you apply
skipUpsert
option in query, do you get both old data + updated data or just old data?
a
And when I add another 2 records to the same Kafka topic. I think it’s ingested to Pinot from the info in the following picture. The only problem is the created_on is 0 here. It should be a long type timestamp.
When I apply skipUpsert option, both old data and new data are showed.
j
How many servers do you have? Since it returns 2 records with the same timestamp, I suspect the source steam is not partitioned on the primary key
a
Hi, only one Kafka topic partition and 6 servers.
My confusion is why 0 is created for created_on field here? I used System.currentTimeMillis() or now(). And why the existing records could not be queried?
the following picture may explain this issue more clearly. After a while, no records found. With skipUpsert option, these records are actually there.
k
So I've tried reproducing this issue on both 0.10 and master build. The
No records(s) Found
error does occur in both. Need to check why is that the case. The
created_on
as 0 is not reproducible so far. It shows correct values in all tried out permutations.
Updated: The issue seems to arise whenever
comparisonColumn
is used with a partial updated column set to
IGNORE
. If the column is set to OVERWRITE, it works as expected. If
comparisonColumn
is not specified,
timeColumn
is used for comparison which again causes the same issue as in this case.
Ok. So I've understood why this is happening
Copy code
We store primaryKey to recordLocation mapping in memory. The recordLocation contains the comparison value as well.


The following happens

Record A arrives with Key "key" and created_on as 100, docId 0
Record B arrives with Key "key" and created_on as 200, docId 1

The final state of primaryKey store is "key" -> RecordLocation("comparable" -> 200, docId -> 1)
However, the actual record stored in segment is ("key", docId -> 1, created_on -> 100) (since created_on is set to IGNORE) 

When consuming segment is getting committed, 'addSegment' gets called which iterates upong all the previous records and creates a `validDocId` list

For our case, it checks `created_on` in segment's stored record (100) is less than the one in primaryKey map (200). So it simply ignores this record and doesn't add it to `validDocId` list.

Hence, you don't get this record at query time once the segments commits.
@Jackie this particular code segment is causing this issue where
recordInfo
is created before
updatedRecord
where we apply partial update. Should we be updating this object after
updateRecord
is called? Small changed but not sure what it may break.
Copy code
PartitionUpsertMetadataManager.RecordInfo recordInfo = getRecordInfo(row, numDocsIndexed);
      GenericRow updatedRow = _partitionUpsertMetadataManager.updateRecord(row, recordInfo);
      updateDictionary(updatedRow);
      addNewRow(numDocsIndexed, updatedRow);
      // Update number of documents indexed before handling the upsert metadata so that the record becomes queryable
      // once validated
      canTakeMore = numDocsIndexed++ < _capacity;
      _partitionUpsertMetadataManager.addRecord(this, recordInfo);
j
@Kartik Khare Great analysis! IMO we should not allow modifying of the
comparisonColumn
. We use
comparisonColumn
to determine which record to keep, and allow modifying it can break the contract
k
Yep. So I guess then we should add a validation config in table so that it warns user
j
We can add the validation in
TableConfigUtils.validateUpsertConfig()
to reject table config with upsert strategy on primary key columns or comparison column. In
PartialUpsertHandler
, we may log warning if we find strategy configured for these columns, and don't add them to the mergers in order to fix the tables already exist
k
cool. will raise the fix
👍 1