Hi, we have a realtime table that consumes a Kafka...
# troubleshooting
t
Hi, we have a realtime table that consumes a Kafka topic and creates new segments every hour (
realtime.segment.flush.threshold.time: "1h"
). Its replica is set to 2, and when I query the number of documents on a not too recent interval, I can see two different numbers alternating. I understand that the Kafka offsets of the two servers consuming the same partition can drift. But when I select an interval that’s couple hours from the current time, so presumably querying from a finished/closed segment, I’m still facing the same issue. According to the docs, shouldn’t the other replica, which has less records consumed, acquire the segment with the more documents in it after it’s closed?
k
What’s the query? Can you check if partialresponse flag is try in the response
t
The query’s like:
Copy code
select count(*) from mytable_REALTIME
where startedAt < 1613220000000 and startedAt > 1613218000000
limit 10
and according to the Query Console the
partialResponse
is
-
.
k
that does not make sense unless you still have data flowing in for that time range
t
That’s highly unlikely. Maybe there’s some misconfiguration in our table?
Our configuration (the indices are removed):
Copy code
{
  "tableName": "mytable",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "schemaName": "mytable",
    "timeColumnName": "startedAt",
    "timeType": "MILLISECONDS",
    "replicasPerPartition": "2",
    "retentionTimeUnit": "DAYS",
    "retentionTimeValue": "1",
    "completionMode": "DOWNLOAD"
  },
  "tableIndexConfig": {
    "invertedIndexColumns": [],
    "createInvertedIndexDuringSegmentGeneration": false,
    "sortedColumn": [],
    "bloomFilterColumns": [],
    "starTreeIndexConfigs": [],
    "noDictionaryColumns": [],
    "onHeapDictionaryColumns": [],
    "varLengthDictionaryColumns": [],
    "loadMode": "HEAP",
    "columnMinMaxValueGeneratorMode": "ALL",
    "nullHandlingEnabled": false,
    "aggregateMetrics": true
  },
  "ingestionConfig": {
    "streamIngestionConfig": {
      "streamConfigMaps": [
        {
          "realtime.segment.flush.threshold.rows": "0",
          "realtime.segment.flush.threshold.time": "1h",
          "realtime.segment.flush.threshold.segment.size": "100M",
          "stream.kafka.broker.list": "${KAFKA_BOOTSTRAP_SERVERS}",
          "stream.kafka.consumer.prop.auto.offset.reset": "largest",
          "stream.kafka.consumer.type": "lowlevel",
          "security.protocol": "SASL_SSL",
          "sasl.mechanism": "PLAIN",
          "sasl.jaas.config": "${KAFKA_JAAS_CONFIG}",
          "isolation.level": "read_committed",
          "stream.kafka.topic.name": "pinot",
          "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
          "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
          "streamType": "kafka"
        }
      ]
    },
    "batchIngestionConfig": {
      "segmentPushType": "APPEND",
      "segmentPushFrequency": "HOURLY"
    }
  },
  "task": {
    "taskTypeConfigsMap": {
      "RealtimeToOfflineSegmentsTask": {
        "bucketTimePeriod": "4h",
        "bufferTimePeriod": "5m",
        "collectorType": "rollup",
        "length.aggregationType": "sum",
        "endPosition.aggregationType": "sum",
        "maxNumRecordsPerSegment": "10000000"
      }
    }
  },
  "tenants": {
    "broker": "DefaultTenant",
    "server": "DefaultTenant"
  },
  "metadata": {}
}
x
I saw this is a rollup config, is it possible that it happens and impact the count? have you tried to query sum(met) to see if the results is same?
also does restart broker help?
k
The issue was one of the committed segment had different number of rows across the replicas.. my guess is that Kafka brokers have inconsistent data and gets getting reflected in Pinot..
Basically both segments have same start/ end offsets but different number of rows
s
Two possible scenarios:
(1) Kafka brokers experienced network partition, and somehow did not sync up right. (2) The table config was changed to point to a different kafka cluster or topic (but with same fields)
t
Although I’m not a 100% sure, but it looks like the root cause has been found: I set the Pinot input Kafka topic policy to
compact,delete
without understanding how compacting topics works. The partition keys can be the same for multiple records in our case. Since using only
delete
, we’ve not seen this record number discrepancy in Pinot.
👏 1
k
👍