Hi folks. I tested the ‘Stream Ingestion with Dedu...
# troubleshooting
k
Hi folks. I tested the ‘Stream Ingestion with Dedup’ feature under the latest 0.11.0-SNAPSHO version. It was found that the function of deduplicating data could not be achieved, and when creating the table, “dedupEnabled”: true was added. After the table is created, the dedupConfig configuration is not found in the table’s config file. May I ask how is this going?
s
Can you share the table config you uploaded?
k
Copy code
{
  "tableName": "abc_stage_REALTIME",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "timeType": "MILLISECONDS",
    "schemaName": "abc_stage",
    "retentionTimeUnit": "HOURS",
    "retentionTimeValue": "2",
    "timeColumnName": "timestamp",
    "replicasPerPartition": "1",
    "completionConfig": {
      "completionMode": "DOWNLOAD"
    },
    "segmentPushType": "APPEND"
  },
  "tenants": {
    "broker": "DefaultTenant",
    "server": "DefaultTenant"
  },
  "tableIndexConfig": {
    "invertedIndexColumns": [],
    "rangeIndexColumns": [
      "timestamp"
    ],
    "rangeIndexVersion": 2,
    "autoGeneratedInvertedIndex": false,
    "createInvertedIndexDuringSegmentGeneration": false,
    "sortedColumn": [
      "id"
    ],
    "loadMode": "MMAP",
    "noDictionaryColumns": [],
    "enableDefaultStarTree": false,
    "starTreeIndexConfigs": [
      {
        "dimensionsSplitOrder": [
          "id",
          "epoch_minute",
          "hour",
          "day",
          "month"
        ],
        "functionColumnPairs": [
          "AVG__duration",
          "MAX__duration",
          "MIN__duration",
          "SUM__duration",
          "PERCENTILE_EST__duration",
          "PERCENTILE_TDIGEST__duration",
          "COUNT__*"
        ],
        "maxLeafRecords": 5000000
      }
    ],
    "enableDynamicStarTreeCreation": true,
    "aggregateMetrics": true,
    "nullHandlingEnabled": true,
    "streamConfigs": {
      "streamType": "kafka",
      "stream.kafka.consumer.type": "lowlevel",
      "stream.kafka.topic.name": "abc",
      "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
      "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
      "stream.kafka.broker.list": "127.0.0.1:9092",
      "stream.kafka.consumer.prop.auto.offset.reset": "largest",
      "realtime.segment.flush.threshold.time": "30m",
      "realtime.segment.flush.threshold.rows": "0",
      "realtime.segment.flush.threshold.segment.size": "100M",
      "realtime.segment.flush.autotune.initialRows": "1000000"
    }
  },
  "metadata": {
    "customConfigs": {}
  },
  "task": {
    "taskTypeConfigsMap": {
      "RealtimeToOfflineSegmentsTask": {
        "bucketTimePeriod": "1h",
        "bufferTimePeriod": "12h",
        "schedule": "0 0 0/1 * * ?",
        "maxNumRecordsPerSegment": "1500000"
      }
    }
  },
  "ingestionConfig": {
    "filterConfig": {},
    "transformConfigs": [
      {
        "columnName": "timestamp",
        "transformFunction": "jsonPathLong(body, '$.timestamp')"
      },
      {
        "columnName": "duration",
        "transformFunction": "jsonPathLong(body, '$.duration')"
      },
      {
        "columnName": "epoch_minute",
        "transformFunction": "toEpochMinutes(jsonPathLong(body, '$.timestamp'))"
      },
      {
        "columnName": "hour",
        "transformFunction": "hour(jsonPathLong(body, '$.timestamp'))"
      },
      {
        "columnName": "day",
        "transformFunction": "day(jsonPathLong(body, '$.timestamp'))"
      },
      {
        "columnName": "month",
        "transformFunction": "month(jsonPathLong(body, '$.timestamp'))"
      },
      {
        "columnName": "year",
        "transformFunction": "year(jsonPathLong(body, '$.timestamp'))"
      }
    ]
  },
  "routing": {
    "instanceSelectorType": "strictReplicaGroup"
  },
  "dedupConfig": {
    "dedupEnabled": true,
    "hashFunction": "NONE"
  },
  "isDimTable": false
}
s
Config looks good. Which pinot version are you running this against? BTW, if you're using latest pinot 0.11.x, the controller API to upload tableConfig will let you know if certain fields are being ignored as
"unrecognizedProperties"
Could you verify the controller response to check if anything is being ignored?
n
Have you set the primary key in the schema?
s
Without the primary key in the schema, controller should fail the dedupConfig table creation. Although yes, if you don't have primary key in schema, and the table creation still succeeded, there's a chance you're running some old pinot version
k
@saurabh dubey uploaded with the controller API of tableConfig and addTable , and there is no “unrecognizedProperties” notification.
@Neha Pawar The primary key has been set in the schema table.
s
You're on an older version of pinot then @Kevin Liu
k
I’m using the latest version of the master branch (0.11.0-SNAPSHO), but didn’t rebuild.
s
I'd suggest a full rebuild and then trying it out.
Clearly the controller isn't returning the new response for table APIs which means its not latest. dedup change is much more recent than the controller API change
k
Yes, I am rebuilding.
@saurabh dubey I have appeared that the construction is completed, and restarted each service such as Controller, broker, server, etc., but the state after creating the table is “BAD”, and checking the server log shows: Failed to find current state for instance: Server_127.0.0. 1_8001, sessionId: 100000950cd000b, table: abc_stage_REALTIME Sleeping 1 second waiting for all segments loaded for table: abc_stage_REALTIME
Thanks a lot, dudup has worked.
👍 1
s
Goog to know @Kevin Vu. Do share if there are any issues / concerns around dedup and its functionality.