Tamás Nádudvari
02/13/2021, 11:09 AMrealtime.segment.flush.threshold.time: "1h"
). Its replica is set to 2, and when I query the number of documents on a not too recent interval, I can see two different numbers alternating. I understand that the Kafka offsets of the two servers consuming the same partition can drift. But when I select an interval that’s couple hours from the current time, so presumably querying from a finished/closed segment, I’m still facing the same issue. According to the docs, shouldn’t the other replica, which has less records consumed, acquire the segment with the more documents in it after it’s closed?Kishore G
Tamás Nádudvari
02/13/2021, 2:30 PMselect count(*) from mytable_REALTIME
where startedAt < 1613220000000 and startedAt > 1613218000000
limit 10
and according to the Query Console the partialResponse
is -
.Kishore G
Tamás Nádudvari
02/13/2021, 2:54 PMTamás Nádudvari
02/13/2021, 2:58 PM{
"tableName": "mytable",
"tableType": "REALTIME",
"segmentsConfig": {
"schemaName": "mytable",
"timeColumnName": "startedAt",
"timeType": "MILLISECONDS",
"replicasPerPartition": "2",
"retentionTimeUnit": "DAYS",
"retentionTimeValue": "1",
"completionMode": "DOWNLOAD"
},
"tableIndexConfig": {
"invertedIndexColumns": [],
"createInvertedIndexDuringSegmentGeneration": false,
"sortedColumn": [],
"bloomFilterColumns": [],
"starTreeIndexConfigs": [],
"noDictionaryColumns": [],
"onHeapDictionaryColumns": [],
"varLengthDictionaryColumns": [],
"loadMode": "HEAP",
"columnMinMaxValueGeneratorMode": "ALL",
"nullHandlingEnabled": false,
"aggregateMetrics": true
},
"ingestionConfig": {
"streamIngestionConfig": {
"streamConfigMaps": [
{
"realtime.segment.flush.threshold.rows": "0",
"realtime.segment.flush.threshold.time": "1h",
"realtime.segment.flush.threshold.segment.size": "100M",
"stream.kafka.broker.list": "${KAFKA_BOOTSTRAP_SERVERS}",
"stream.kafka.consumer.prop.auto.offset.reset": "largest",
"stream.kafka.consumer.type": "lowlevel",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "PLAIN",
"sasl.jaas.config": "${KAFKA_JAAS_CONFIG}",
"isolation.level": "read_committed",
"stream.kafka.topic.name": "pinot",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"streamType": "kafka"
}
]
},
"batchIngestionConfig": {
"segmentPushType": "APPEND",
"segmentPushFrequency": "HOURLY"
}
},
"task": {
"taskTypeConfigsMap": {
"RealtimeToOfflineSegmentsTask": {
"bucketTimePeriod": "4h",
"bufferTimePeriod": "5m",
"collectorType": "rollup",
"length.aggregationType": "sum",
"endPosition.aggregationType": "sum",
"maxNumRecordsPerSegment": "10000000"
}
}
},
"tenants": {
"broker": "DefaultTenant",
"server": "DefaultTenant"
},
"metadata": {}
}
Xiang Fu
Xiang Fu
Kishore G
Kishore G
Subbu Subramaniam
02/16/2021, 9:28 PMSubbu Subramaniam
02/16/2021, 9:29 PMTamás Nádudvari
02/23/2021, 10:30 AMcompact,delete
without understanding how compacting topics works. The partition keys can be the same for multiple records in our case.
Since using only delete
, we’ve not seen this record number discrepancy in Pinot.Kishore G