I have a question about adding dedupConfig to a ta...
# troubleshooting
a
I have a question about adding dedupConfig to a table, can it be added to an existing table, or do you have to drop and recreate the table for it to take effect? We have an existing table we've added that config to. In testing, if we start with the table config / schema in thread as a new table, everything works as expected (records are deduped based on event_id). However, if that dedupConfig section is added to the existing table config (through the pinot UI), it doesn't dedupe and seems to ignore the config. I have verified that after adding the section, it shows up and no errors are given in the UI.
TABLE CONFIG
Copy code
{
  "REALTIME": {
    "tableName": "immutable_events_REALTIME",
    "tableType": "REALTIME",
    "segmentsConfig": {
      "timeType": "MILLISECONDS",
      "schemaName": "immutable_events",
      "retentionTimeUnit": "DAYS",
      "retentionTimeValue": "4",
      "replicasPerPartition": "2",
      "timeColumnName": "event_timestamp",
      "minimizeDataMovement": false
    },
    "tenants": {
      "broker": "DefaultTenant",
      "server": "immutable-realtime"
    },
    "tableIndexConfig": {
      "rangeIndexVersion": 2,
      "autoGeneratedInvertedIndex": false,
      "createInvertedIndexDuringSegmentGeneration": false,
      "sortedColumn": [
        "tenant_name"
      ],
      "loadMode": "MMAP",
      "streamConfigs": {
        "streamType": "kafka",
        "stream.kafka.consumer.type": "lowlevel",
        "stream.kafka.topic.name": "immutable-enriched-events",
        "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
        "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
        "stream.kafka.broker.list": "ica-kafka-kafka-bootstrap:9092",
        "realtime.segment.flush.threshold.rows": "10",
        "realtime.segment.flush.autotune.initialRows": "100000",
        "realtime.segment.flush.threshold.time": "6h",
        "realtime.segment.flush.threshold.segment.size": "675M",
        "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
        "stream.kafka.decoder.prop.schema.registry.rest.url": "<http://ica-schema-registry:8081>",
        "schema.registry.url": "<http://ica-schema-registry:8081>"
      },
      "enableDefaultStarTree": false,
      "enableDynamicStarTreeCreation": false,
      "aggregateMetrics": false,
      "nullHandlingEnabled": false,
      "optimizeDictionaryForMetrics": false,
      "noDictionarySizeRatioThreshold": 0
    },
    "metadata": {
      "customConfigs": {}
    },
    "task": {
      "taskTypeConfigsMap": {
        "RealtimeToOfflineSegmentsTask": {
          "bucketTimePeriod": "1d",
          "bufferTimePeriod": "1d",
          "roundBucketTimePeriod": "1d",
          "mergeType": "rollup",
          "user_raw_risk_score.aggregationType": "max",
          "user_risk_score.aggregationType": "max",
          "user_threat_score.aggregationType": "max",
          "maxNumRecordsPerSegment": "5000000",
          "schedule": "0 * * * * ?"
        }
      }
    },
    "routing": {
      "instanceSelectorType": "strictReplicaGroup"
    },
    "dedupConfig": {
      "dedupEnabled": true,
      "hashFunction": "NONE"
    },
    "isDimTable": false
  }
}
TABLE SCHEMA
Copy code
{
  "schemaName": "immutable_events",
  "dimensionFieldSpecs": [
    {
      "name": "event_id",
      "dataType": "STRING"
    },
    {
      "name": "event_type",
      "dataType": "STRING"
    },
    {
      "name": "event_type_source",
      "dataType": "STRING"
    },
    {
      "name": "account_type",
      "dataType": "STRING"
    },
    {
      "name": "action_taken",
      "dataType": "STRING"
    },
    {
      "name": "activity_type",
      "dataType": "STRING"
    },
    {
      "name": "browser",
      "dataType": "STRING"
    },
    {
      "name": "city",
      "dataType": "STRING"
    },
    {
      "name": "country",
      "dataType": "STRING"
    },
    {
      "name": "device",
      "dataType": "STRING"
    },
    {
      "name": "device_management_status",
      "dataType": "STRING"
    },
    {
      "name": "document_type",
      "dataType": "STRING"
    },
    {
      "name": "domain",
      "dataType": "STRING"
    },
    {
      "name": "elastica_user",
      "dataType": "STRING"
    },
    {
      "name": "external_recipients",
      "dataType": "STRING",
      "singleValueField": false
    },
    {
      "name": "host",
      "dataType": "STRING"
    },
    {
      "name": "internal_recipients",
      "dataType": "STRING",
      "singleValueField": false
    },
    {
      "name": "object_type",
      "dataType": "STRING"
    },
    {
      "name": "policy_type",
      "dataType": "STRING"
    },
    {
      "name": "policy_violated",
      "dataType": "STRING"
    },
    {
      "name": "region",
      "dataType": "STRING"
    },
    {
      "name": "resp_code",
      "dataType": "STRING"
    },
    {
      "name": "sender",
      "dataType": "STRING"
    },
    {
      "name": "service",
      "dataType": "STRING"
    },
    {
      "name": "severity",
      "dataType": "STRING"
    },
    {
      "name": "source",
      "dataType": "STRING"
    },
    {
      "name": "status",
      "dataType": "STRING"
    },
    {
      "name": "sub_feature",
      "dataType": "STRING"
    },
    {
      "name": "user",
      "dataType": "STRING"
    },
    {
      "name": "user_name",
      "dataType": "STRING"
    },
    {
      "name": "user_agent",
      "dataType": "STRING"
    },
    {
      "name": "user_type",
      "dataType": "STRING"
    },
    {
      "name": "user_company",
      "dataType": "STRING"
    },
    {
      "name": "user_country",
      "dataType": "STRING"
    },
    {
      "name": "user_department",
      "dataType": "STRING"
    },
    {
      "name": "user_domain",
      "dataType": "STRING"
    },
    {
      "name": "user_first_name",
      "dataType": "STRING"
    },
    {
      "name": "user_is_active",
      "dataType": "BOOLEAN"
    },
    {
      "name": "user_is_admin",
      "dataType": "BOOLEAN"
    },
    {
      "name": "user_last_name",
      "dataType": "STRING"
    },
    {
      "name": "user_org_unit",
      "dataType": "STRING"
    },
    {
      "name": "user_risk_rating",
      "dataType": "STRING"
    },
    {
      "name": "user_title",
      "dataType": "STRING"
    },
    {
      "name": "tenant_name",
      "dataType": "STRING"
    }
  ],
  "metricFieldSpecs": [
    {
      "name": "activity_count",
      "dataType": "INT"
    },
    {
      "name": "file_size",
      "dataType": "INT"
    },
    {
      "name": "req_size",
      "dataType": "INT"
    },
    {
      "name": "resp_size",
      "dataType": "INT"
    },
    {
      "name": "event_count",
      "dataType": "INT",
      "defaultNullValue": 1
    },
    {
      "name": "user_risk_score",
      "dataType": "INT"
    },
    {
      "name": "user_raw_risk_score",
      "dataType": "FLOAT"
    },
    {
      "name": "user_threat_score",
      "dataType": "INT"
    }
  ],
  "dateTimeFieldSpecs": [
    {
      "name": "event_timestamp",
      "dataType": "LONG",
      "format": "1:MILLISECONDS:EPOCH",
      "granularity": "1:DAYS"
    }
  ],
  "primaryKeyColumns": [
    "event_id"
  ]
}
m
If the config is added to a existing table, then you’d need to restart the nodes, and it will only take effect for new data (same as upsert).
a
just to confirm, you meant if it's added to an existing table right? And when you say restart the nodes, do you mean scale down the pinot server pods then scale back up?
I saw this line in the UpsertConfig doc, so I think that's what you meant:
However, you need to restart all Pinot servers so that it can rebuild the primary key to record location map with the new columns.
I'll give that a try, thanks!
m
Yes existing. By restart I mean simply restarting the Pinot server jvm, however you usually do it. No scale down or up needed
n
Dunno if that will work for dedup. Upsert keeps all rows but just maintains right metadata. Versus dedup skips the row while ingesting. So I think restarting won't build it correctly for config added later on. Cc @saurabh dubey to confirm
s
Adding dedup to existing realtime table won't affect rows that are already ingested i.e. it won't look at already ingested rows and remove duplicate rows amongst them. However, it will dedup newer rows being ingested going forward. If the table had ingested duplicate PK rows before dedup was enabled, those will not be touched..
a
I was able to get dedup to work for new rows by scaling down / scaling up the server pods so that the table config change got applied. I'm not sure how to restart the jvm so maybe we could script that instead of having to take the pods down. Either way the dedup does work once the server picks up the table config change. Thanks all!