Hi team. Could you please give some suggestion her...
# general
a
Hi team. Could you please give some suggestion here? I’ve assigned 3 servers for a stream type table about 200m records withe reported 25G size. Each server has 64G memory. Some queries returned servers not responded and “errorCode”: 427. The Kafka topic has 3 partitions and it’s not gonna change the partition number. Does it help if I scale up pinot servers to 6 for this table in this case?
h
200m records and 25G size do not seem to be large amount of data. Could you please provide more details about the table configuration, server configuration and the queries with errors?
a
The query returned 427 code is like the following, and the timestamp filed is defined as LONG in table config. The records with timestamp between 04-01 and 04-11 in this table is about 40m. select count(*), ToDateTime(“timestamp”, ‘yyyy-MM-dd’) AS dts from performance_test where ToDateTime(“timestamp”, ‘yyyy-MM-dd’) >= ‘2022-04-01’ and ToDateTime(“timestamp”, ‘yyyy-MM-dd’) < ‘2022-04-11’ group by ToDateTime(“timestamp”, ‘yyyy-MM-dd’) order by ToDateTime(“timestamp”, ‘yyyy-MM-dd’)
Table config is: { “tableName”: “performance_test”, “tableType”: “REALTIME”, “segmentsConfig”: { “schemaName”: “performance_test”, “timeColumnName”: “timestamp”, “timeType”: “MILLISECONDS”, “allowNullTimeValue”: true, “replicasPerPartition”: “1", “retentionTimeUnit”: “DAYS”, “retentionTimeValue”: “90", “segmentPushType”: “APPEND”, “completionConfig”: { “completionMode”: “DOWNLOAD” } }, “tenants”: { “broker”: “RestApiTest”, “server”: “RestApiTest” }, “tableIndexConfig”: { “optimizeDictionaryForMetrics”: true, “noDictionaryColumns”: [ “id”, “request_id”, “timestamp”, “response_body” ], “sortedColumn”: [ “entity_id” ], “rangeIndexColumns”: [ “timestamp” ], “invertedIndexColumns”: [ ], “loadMode”: “MMAP”, “aggregateMetrics”: true, “nullHandlingEnabled”: true, “streamConfigs”: { “streamType”: “kafka”, “stream.kafka.consumer.type”: “lowlevel”, “stream.kafka.topic.name”: “xxx”, “stream.kafka.decoder.class.name”: “org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder”, “stream.kafka.consumer.factory.class.name”: “org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory”, “stream.kafka.broker.list”: “xxxx:9092”, “stream.kafka.consumer.prop.auto.offset.reset”: “smallest”, “realtime.segment.flush.threshold.time”: “15m”, “realtime.segment.flush.threshold.rows”: “0”, “realtime.segment.flush.threshold.segment.size”: “100M”, “realtime.segment.flush.autotune.initialRows”: “1000000” } }, “ingestionConfig”: { “filterConfig”: { “filterFunction”: “Groovy({dbTable != \“xxxxxx\“}, dbTable)” }, “transformConfigs”: [ { “columnName”: “id”, “transformFunction”: “jsonPathString(metrics, ‘$.id’)” }, { “columnName”: “entity_id”, “transformFunction”: “jsonPathString(metrics, ‘$.entity_id’)” }, { “columnName”: “tenant_id”, “transformFunction”: “jsonPathString(metrics, ‘$.tenant_id’)” }, { “columnName”: “sub_tenant_id”, “transformFunction”: “jsonPathString(metrics, ‘$.sub_tenant_id’)” }, { “columnName”: “service_name”, “transformFunction”: “jsonPathString(metrics, ‘$.service_name’)” }, { “columnName”: “type”, “transformFunction”: “jsonPathString(metrics, ‘$.type’)” }, { “columnName”: “status”, “transformFunction”: “jsonPathString(metrics, ‘$.status’)” }, { “columnName”: “request_id”, “transformFunction”: “jsonPathString(metrics, ‘$.request_id’)” }, { “columnName”: “api”, “transformFunction”: “jsonPathString(metrics, ‘$.api’)” }, { “columnName”: “http_method”, “transformFunction”: “jsonPathString(metrics, ‘$.http_method’)” }, { “columnName”: “http_status”, “transformFunction”: “jsonPathString(metrics, ‘$.http_status’)” }, { “columnName”: “request_body”, “transformFunction”: “jsonPathString(metrics, ‘$.request_body’)” }, { “columnName”: “response_body”, “transformFunction”: “jsonPathString(metrics, ‘$.response_body’)” }, { “columnName”: “request_path”, “transformFunction”: “jsonPathString(metrics, ‘$.request_path’)” }, { “columnName”: “filebeat_timestamp”, “transformFunction”: “jsonPathString(metrics, ‘$.filebeat_timestamp’)” }, { “columnName”: “response_code”, “transformFunction”: “jsonPathString(metrics, ‘$.response_code’)” }, { “columnName”: “latency”, “transformFunction”: “jsonPathString(metrics, ‘$.latency’)” }, { “columnName”: “timestamp”, “transformFunction”: “jsonPathString(metrics, ‘$.timestamp’)” }, { “columnName”: “created_on”, “transformFunction”: “jsonPathString(metrics, ‘$.created_on’)” }, { “columnName”: “updated_on”, “transformFunction”: “jsonPathString(metrics, ‘$.updated_on’)” } ] }, “metadata”: { “customConfigs”: {} }, “routing”: { “instanceSelectorType”: “strictReplicaGroup” }, “upsertConfig”: { “mode”: “PARTIAL”, “defaultPartialUpsertStrategy”: “OVERWRITE”, “partialUpsertStrategies”:{ “created_on”: “IGNORE” } } }
server configuration: Allocatable: cpu: 15890m ephemeral-storage: 18242267924 hugepages-1Gi: 0 hugepages-2Mi: 0 memory: 61358360Ki JVM opts: -Xms8G -Xmx32G -XX:+UseG1GC -XX:MaxGCPauseMillis=200
“dateTimeFieldSpecs”: [ { “name”: “timestamp”, “dataType”: “LONG”, “format”: “1MILLISECONDSEPOCH”, “granularity”: “1:MILLISECONDS” }, { “name”: “created_on”, “dataType”: “LONG”, “format”: “1MILLISECONDSEPOCH”, “granularity”: “1:MILLISECONDS” }, { “name”: “updated_on”, “dataType”: “LONG”, “format”: “1MILLISECONDSEPOCH”, “granularity”: “1:MILLISECONDS” } ], “primaryKeyColumns”: [ “entity_id”,“timestamp”,“request_id” ]
h
just to be super clear: other queries work fine and do not return the same error code?
a
Shorten this timestamp range works well in this query. Query with filter by timestamp range is mostly used. Will add here if other kind of queries have this error later.
m
A couple of suggestions:
Copy code
- Use derived columns to store the different time granularities you want during queries. This will avoid the expensive UDF.
- For the primary time column also, unless you will query in milli, don't store in milli, store in lowest granularity you will query
👍 1
a
I found JVM Heap use is high, is it due to upsert mode? Any tuning suggestion?
m
Increase partitions and split them across more servers. cc @Kartik Khare @Jackie for improving memory for upsert
a
I believe it’s a good way to increase partitions. Thanks. @Mayank
👍 1