https://pinot.apache.org/ logo
k

Kishore G

01/12/2021, 4:30 PM
That’s expected with high level stream consumer
v

vmarchaud

01/12/2021, 4:33 PM
Not sure to understand why ?
k

Kishore G

01/12/2021, 4:42 PM
high level stream combines all partitions of a stream into one stream. splitting it into multiple segments will result in inconsistency and data duplication
this video explains the problems with high level stream consumer and why we chose to implement partition level consumer
v

vmarchaud

01/12/2021, 5:04 PM
splitting it into multiple segments will result in inconsistency and data duplication
Well i agree on this one thats why i dont get why we have multiple segments
k

Kishore G

01/12/2021, 5:09 PM
Multiple parallel segments is required for scaling
If the event rate is in 100’s splitting is not needed
But once you reach thousands it helps
Also it’s unit of parallelism at query time
It just gives you more options as you scale on ingestion or on query side
v

vmarchaud

01/12/2021, 5:14 PM
Most of our segment will not be getting more than 500 events/s (if so that would last only few minutes)
I dont see where i can force to have only one segment for the realtime table
s

Subbu Subramaniam

01/12/2021, 5:19 PM
@vmarchaud we also had multiple operational issues with high level streams. Consider the case when you have 4 replicas, and one of the hosts go down. You will need to bring up a new host, and wait until it catches up with the latest offset before you can send queries to it. We also had operational issues when hosts were mistakenly tagged with the same tag, thus splitting the stream between the two.
I dont know what you mean by "force to have only one segment". For the high level stream consumtption each consumer builds their own segments and keeps it locally, since it can never be guaranteed that the rows consumed by one replica is the same as rows consumed by any other
v

vmarchaud

01/12/2021, 5:22 PM
i meant to only have one consumer
from my understanding i have 3 segment (one on each of my server), so i get 3 different consumer
s

Subbu Subramaniam

01/12/2021, 5:30 PM
If you are using high level consumers, as I understand you do, then you should have one segment in progress and the others completed. The older segments will be removed when the retention time is over
v

vmarchaud

01/12/2021, 5:37 PM
I do use high level consumers, i got 3 realtime segment (for the same realtime table), all of them in progress
Is there any other place that i can check to verify i have only one consuming ?
k

Kishore G

01/12/2021, 5:43 PM
what ever you are seeing is the expected behavior...can you paste your table config
v

vmarchaud

01/12/2021, 8:13 PM
Copy code
{
    tableName: XXXXXX,
    tableType: 'REALTIME',
    quota: {},
    routing: {},
    segmentsConfig: {
      schemaName: YYYYY,
      timeColumnName: ZZZZZ,
      timeType: ZZZZZ,
      replication: 1,
      replicasPerPartition: 1,
      segmentPushType: 'APPEND',
      segmentPushFrequency: 'HOURLY'
    },
    tableIndexConfig: {
      streamConfigs: {
        'streamType': 'pubsub',
        'stream.pubsub.consumer.type': 'highlevel',
        'stream.pubsub.decoder.class.name': 'com.reelevant.pinot.plugins.stream.pubsub.PubSubMessageDecoder',
        'stream.pubsub.consumer.factory.class.name': 'com.reelevant.pinot.plugins.stream.pubsub.PubSubConsumerFactory',
        'stream.pubsub.project.id': XXXXXX,
        'stream.pubsub.topic.name': 'unused', // unused but required because the plugin extends the kafka one
        'stream.pubsub.subscription.id': ZZZZZ,
        'realtime.segment.flush.threshold.time': '15d',
        'realtime.segment.flush.threshold.rows': '390000' // 390k rows ~ 200MB (513 bytes / row)
        // 'realtime.segment.flush.threshold.segment.size': '200M' this option need `realtime.segment.flush.threshold.rows` to be 0 and doesn't work in 0.6.0 (`Illegal memory allocation 0 for segment ...`)
      },
      nullHandlingEnabled: true,
      invertedIndexColumns: [],
      sortedColumn: [],
      loadMode: 'mmap'
    },
    tenants: {},
    metadata: {}
  }
From the docs:
Copy code
Depending on the configured number of replicas, multiple stream-level consumers are created, taking care that no two replicas exist on the same server host. Therefore you need to provision exactly as many hosts as the number of replicas configured.
However in our setup we have one replica with one partition, so i expect to only have one segment (so one consumer).