Hi, I have a setup with apachepinot/pinot:0.10.0-...
# general
h
Hi, I have a setup with apachepinot/pinot:0.10.0-SNAPSHOT-df1c2681fd-20220207-jdk11 docker version to run my Pinot Server. I am running 4 pinot-servers, running on 4 m5a.large. It is the only container running in one box (i have given 6GB for VM in JVM args) What I expect: The ingestion of events should be constant. What I observed: When I run this cluster, the ingestion is > 5000RPS. However, it goes below 100RPS after 1 day of continue run. If I restart all the pinot-servers, they ingestion rate goes back to 5K+ RPS again Am I missing some setting which is causing this degrade?
k
Hi Harish Can you share the tableConfig and schema for this.
h
Copy code
{
    "schemaName": "aggregate_v1",
    "dimensionFieldSpecs": [
        {
            "name": "ext_event_type",
            "dataType": "STRING"
        },
        {
            "name": "dim__channel",
            "dataType": "STRING"
        },
        {
            "name": "dim__pipeline",
            "dataType": "STRING"
        },
        {
            "name": "dim__internal",
            "dataType": "STRING"
        },
        {
            "name": "provider",
            "dataType": "STRING"
        },
        {
            "name": "status",
            "dataType": "STRING"
        },
        {
            "name": "year",
            "dataType": "INT"
        },
        {
            "name": "month",
            "dataType": "INT"
        },
        {
            "name": "day",
            "dataType": "INT"
        },
        {
            "name": "hour",
            "dataType": "INT"
        }        
    ],
    "dateTimeFieldSpecs": [
        {
            "name": "eventTime",
            "dataType": "TIMESTAMP",
            "format": "1:MILLISECONDS:EPOCH",
            "granularity": "1:MILLISECONDS"
        }
    ]
}




{
    "tableName": "aggregate_v1",
    "tableType": "REALTIME",
    "segmentsConfig": {
        "schemaName": "aggregate_v1",
        "replication": "2",
        "timeColumnName": "eventTime",
        "allowNullTimeValue": false,
        "replicasPerPartition": "2",
        "retentionTimeUnit": "DAYS",
        "retentionTimeValue": "30"
    },
    "tenants": {
        "broker": "DefaultTenant",
        "server": "DefaultTenant",
        "tagOverrideConfig": {}
    },
    "tableIndexConfig": {
        "noDictionaryColumns": [],
        "invertedIndexColumns": [           
        ],
        "streamConfigs": {
            "streamType": "kafka",
            "stream.kafka.topic.name": "MY TOPIC",
            "stream.kafka.broker.list": "MY BROKER",
            "stream.kafka.consumer.type": "lowlevel",
            "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
            "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
            "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
            "realtime.segment.flush.threshold.rows": "0",
            "realtime.segment.flush.threshold.time": "24h",
            "realtime.segment.flush.segment.size": "100M"
        },
        "rangeIndexColumns": [],
        "rangeIndexVersion": 1,
        "autoGeneratedInvertedIndex": false,
        "createInvertedIndexDuringSegmentGeneration": false,
        "sortedColumn": [],
        "bloomFilterColumns": [],
        "loadMode": "MMAP",
        "onHeapDictionaryColumns": [],
        "varLengthDictionaryColumns": [],
        "enableDefaultStarTree": false,
        "enableDynamicStarTreeCreation": false,
        "aggregateMetrics": false,
        "nullHandlingEnabled": true,
        "starTreeIndexConfigs": [
            {
                "dimensionsSplitOrder": [
                    "ext_event_type",
                    "dim__channel",
                    "status"                    
                ],
                "skipStarNodeCreationForDimensions": [],
                "functionColumnPairs": [
                    "COUNT__*"
                ],
                "maxLeafRecords": 1
            }
        ]
    },
    "metadata": {},
    "quota": {},
    "routing": {},
    "query": {},
    "ingestionConfig": {
        "transformConfigs": [
            {
                "columnName": "year",
                "transformFunction": "year(eventTime, 'Asia/Kolkata')"
            },
            {
                "columnName": "month",
                "transformFunction": "month(eventTime, 'Asia/Kolkata')"
            },
            {
                "columnName": "day",
                "transformFunction": "day(eventTime, 'Asia/Kolkata')"
            },
            {
                "columnName": "hour",
                "transformFunction": "hour(eventTime, 'Asia/Kolkata')"
            }
        ]
    },
    "isDimTable": false,
    "upsertConfig": {}
}
k
@User can you help here? The flush configs seem to be normal as well.
n
@User how do you restart your servers? and have you checked if the segments are rolling over? Asking because I notice 2 things: 1. flush threshold rows = 0 (not sure what the expected behavior is, probably ignore row threshold). flush threshold time is 24h. 2. stream's offset reset criteria is set to
smallest
. I am guessing when you restart you don't have completed segments that are flushed to disk. So, before restart, can you check if the table has completed segments ?
n
Thresholds look right (0 means use size based ad ignore row based like Navina said). my hunch is also the same, so just adding on top of what Navina said. when you set smallest, pinot consumes from earliest offset in kafka, hence the high rate of ingestion in the beginning, which then stabilizes. And when you restart, Pinot will not flush what was in memory, instead reconsume it when it comes back up, hence right rate of ingestion after restart. It’s not a cause for concern, unless you are noticing lag in event arrival.
h
I restart servers with ” kubectl rollout restart -n pinot statefulset.apps/pinot-server “. It restarts servers one by on (stateful set in K8S). I can confirm that sequence of restart is server-3, server-2, server-1, server-0 (with 4 nodes) one at a time.. Is there a way to enforce flushing flush segment to disk before restart?
One more question - the offset smallest means read from offset which are not consumed already. I don’t expect that Pinot will start from the beginning of topic. It should be reading from offset = “last offset which is flushed to disk” + 1. Is this understanding correct?
++ Since we are using “lowLevel” consumer type - it has consumer group as null (the log also shows group=null) . We cannot see the Kafka consumer offset.
n
reading from offset = “last offset which is flushed to disk” + 1. Is this understanding correct?
@User : if there is a segment that was successfully flushed to disk, it should not continue reading from smallest. do you see completed segments on the server? You can also check zookeeper state for under
/<clusterName>/PROPERTYSTORE/SEGMENTS/<tableName>_REALTIME/
in the controller UI
h
Not is is not doing it. I was checking my understanding that after restart it should only read from kafka where data is not pushed to disk.
Going back to original question - I can shee why after reboot I see a spike in read.. it is clear
However one thing is not clear. Why the ingestion is slowing down to less than 100 rps after 1-2 days ( it has lot of messages in kafka for consumption).
One question - What if i make the “stream.kafka.consumer.prop.auto.offset.rese=largest” then it will read from latest offset for first time when table is created. After that once a segment is written then next reboots will read data from last read offset (i.e. the data in memory will not be lost OR if reboot takes 5-10 min then it will not miss data )
m
Yes largest will read from the latest offset for the first time. If you set it to largest, what does the initial consumption rate look like? Trying to ensure that there are events coming in at > 100 eps. Note that when there are messages backed up, the consumers go at full speed (unless throttle enabled), which could explain that initial start and restart needing to catch up, and then the consumption going to steady state. Also how do you meausetthr consumption rate ?
Also how many partitions and what’s the xms xmx that you have set?
h
I have 6 brokers, 40 partitions for the topic. this is on server -
Copy code
-Xms512M -Xmx4G -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -Xlog:gc*:file=/opt/pinot/gc-pinot-server.log"
I can go upto 8GB.. Should I?
m
Looks like you only have two Pinot servers (per replica), so each will consume 20 partitions. And iirc m5 larges has 2 cores and 8GB? That seems over partitioned or under resourced. But before changing either of those, how are you ensure it the consumption rate and that it can be higher but it is not? I’d like to first establish that there is indeed degradation and not the catching up that you see.