https://pinot.apache.org/ logo
Join Slack
Powered by
# kinesis_help
  • n

    Neha Pawar

    10/07/2021, 10:11 PM
    Hey Kartik, please meet Abhijeet
  • n

    Neha Pawar

    10/07/2021, 10:11 PM
    he’s facing some issues when using kinesis
  • n

    Neha Pawar

    10/07/2021, 10:11 PM
    i wanted to verify one behavior from you
  • n

    Neha Pawar

    10/07/2021, 10:13 PM
    he’s set shardIteratorType
    LATEST
    and
    realtime.segment.flush.threshol.time: 6h
  • n

    Neha Pawar

    10/07/2021, 10:14 PM
    now say a segment was CONSUMING. and then the server got restarted. when the server comes back up, will the KinesisConsumer consume from the startOffset recorded in the segment metadata, or will it consume from Latest sequence number?
  • n

    Neha Pawar

    10/07/2021, 10:16 PM
    i have a hunch, the latter must be happening. if so, we have a bug. after restart, we should always consume from the startOffset of the segment
  • a

    Abhijeet Kushe

    10/08/2021, 8:46 PM
    I did delete the OFFline table and removed the realtime to offline task in QA but I still see the Task listed in Zookeeper console .Also the controller keeps printing
    Copy code
    No job to purge for the queue TaskQueue_RealtimeToOfflineSegmentsTask
    I just restarted the entire cluster I still see the above message.I have started Table again with shard iterator at AT_SEQUENCE_NUMBER.I see the iterator is stuck at 66K seconds ago (since we have 1 day retention).I have noticed iin the past if this iterator does not shift for a long time.Will update later on if I dont see it change
  • a

    Abhijeet Kushe

    10/08/2021, 8:46 PM
    Screen Shot 2021-10-08 at 3.29.42 PM.png
  • a

    Abhijeet Kushe

    10/08/2021, 8:46 PM
    <!channel>
  • k

    Kartik Khare

    10/10/2021, 5:16 AM
    @User yes, I think this is a bug. I guess the solution is to not honour the shardIteratorType in config if a valid offset to consume the records is avaliable.
    n
    • 2
    • 2
  • k

    Kartik Khare

    10/10/2021, 5:16 AM
    @User Meanwhile, can you try with AFTER_SEQUENCE_NUMBER as well. We have tested using that and it seems to work.
  • a

    Abhijeet Kushe

    10/11/2021, 1:07 PM
    It actually does work as the moment it consumes the latest message the iterator age drops to 0.I am not aware how the iterage age is supposed to reflect when the shard iterator is AT_SEQUENCE_NUMBER.
    k
    • 2
    • 1
  • a

    Abhijeet Kushe

    10/11/2021, 1:08 PM
    Screen Shot 2021-10-11 at 9.08.33 AM.png
  • n

    Neha Pawar

    10/11/2021, 4:41 PM
    cool
  • a

    Abhijeet Kushe

    01/25/2022, 2:45 PM
    @User @User our GetIterator age metric in cloudwatch for the kinesis stream went out of sync ever since we made a Pinot 9.1 release fix last month.Any ideas what caused it ?
    k
    n
    • 3
    • 44
  • a

    Abhijeet Kushe

    11/28/2022, 8:21 PM
    <!here> We have noticed an increased in read throughput in the past few months and wanted to increase the number of shards.Wanted to know whether you recommend doubling the provisioned shards or using On demand provisioning.I was trying to find documentation on on demand support but did not find ? Should we just double or quadruple the shards and will that need restart or does pinot support on demand provisioning
    k
    • 2
    • 3
  • n

    Neha Pawar

    11/29/2022, 12:26 AM
    it will not need restart. you can just increase based on your increased event rate. pinot will automatically detect once all parent shards are read
  • a

    Abhijeet Kushe

    11/29/2022, 2:14 AM
    so I can use On demand provisioning in kinesis ? I am using 0.9.1
  • n

    Neha Pawar

    11/29/2022, 4:17 AM
    By on demand provisioning if you mean auto scaling from kinesis side, then yes
  • n

    Neha Pawar

    11/29/2022, 4:18 AM
    @Seunghyun @Mayank for fyi
  • a

    Abhijeet Kushe

    11/29/2022, 1:13 PM
    Thanks @Neha Pawar will try it out
  • r

    Rakesh Bobbala

    02/10/2023, 7:54 PM
    Hello Team, I'm pushing data from spark streaming into Kinesis streams and Pinot is ingesting the data from kinesis. In order to know the latency from spark to pinot I'm appending an epoch timestamp before writing to kinesis stream and in the Pinot's table config I'm adding another field with transformation function now() When I query the table and check the latency using the difference between spark write time and pinot's row creation time, I get the average latency around 9-10 seconds So, I want to know if we can reduce the latency here by making changes to the configurations table config:
    Copy code
    {
      "tableName": "backend_pre_processed",
      "tableType": "REALTIME",
      "segmentsConfig": {
        "schemaName": "backend_pre_processed",
        "retentionTimeUnit": "DAYS",
        "retentionTimeValue": "90",
        "replication": "1",
        "replicasPerPartition": "1",
        "timeColumnName": "spark_write_timestamp",
        "minimizeDataMovement": false
      },
      "tenants": {
        "broker": "DefaultTenant",
        "server": "DefaultTenant",
        "tagOverrideConfig": {}
      },
      "tableIndexConfig": {
        "invertedIndexColumns": [],
        "noDictionaryColumns": [],
        "autoGeneratedInvertedIndex": false,
        "createInvertedIndexDuringSegmentGeneration": false,
        "sortedColumn": [],
        "bloomFilterColumns": [],
        "loadMode": "MMAP",
        "streamConfigs": {
          "streamType": "kinesis",
          "stream.kinesis.topic.name": "backend-processed-events",
          "stream.kinesis.consumer.type": "lowlevel",
          "stream.kinesis.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
          "stream.kinesis.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory",
          "realtime.segment.flush.threshold.time": "7d",
          "realtime.segment.flush.threshold.rows": "1000000",
          "region": "us-east-1",
          "maxRecordsToFetch": "3",
          "shardIteratorType": "LATEST",
          "stream.kinesis.fetch.timeout.millis": "30000"
        },
        "onHeapDictionaryColumns": [],
        "varLengthDictionaryColumns": [],
        "enableDefaultStarTree": false,
        "enableDynamicStarTreeCreation": false,
        "aggregateMetrics": false,
        "nullHandlingEnabled": false,
        "optimizeDictionaryForMetrics": false,
        "noDictionarySizeRatioThreshold": 0,
        "rangeIndexColumns": [],
        "rangeIndexVersion": 2
      },
      "metadata": {},
      "quota": {},
      "routing": {},
      "query": {},
      "ingestionConfig": {
        "segmentTimeValueCheck": true,
        "transformConfigs": [
          {
            "columnName": "spark_write_timestamp_epoch",
            "transformFunction": "FromDateTime(spark_write_time, 'yyyy-MM-dd''T''HH:mm:ss.SSS''Z')"
          },
          {
            "columnName": "spark_write_timestamp",
            "transformFunction": "TRIM(spark_write_time)"
          },
          {
            "columnName": "row_created_time",
            "transformFunction": "now()"
          },
          {
            "columnName": "m_event",
            "transformFunction": "TRIM(event)"
          }
        ],
        "continueOnError": false,
        "rowTimeValueCheck": false
      },
      "isDimTable": false
    }
    Also I want to know if this is the right way to test the ingestion time.
  • r

    Rajat Gupta

    03/01/2024, 7:33 PM
    Hi team, I am newbie to Pinot and have a very basic question. I have ingested 15M records to Pinot table from Kineiss stream. When I try to query all the results using
    select * from <TableName>
    I am getting the below Query response. Can someone help answer below questions.
    Copy code
    "requestId": "1862848077000000379",
      "brokerId": "Broker_pinot-broker-2.pinot-broker-headless.pinot.svc.cluster.local_8099",
      "exceptions": [],
      "numServersQueried": 3,
      "numServersResponded": 3,
      "numSegmentsQueried": 28,
      "numSegmentsProcessed": 3,
      "numSegmentsMatched": 3,
      "numConsumingSegmentsQueried": 8,
      "numConsumingSegmentsProcessed": 0,
      "numConsumingSegmentsMatched": 0,
      "numDocsScanned": 30,
      "numEntriesScannedInFilter": 0,
      "numEntriesScannedPostFilter": 270,
      "numGroupsLimitReached": false,
      "maxRowsInJoinReached": false,
      "totalDocs": 15367504,
      "timeUsedMs": 4,
      "offlineThreadCpuTimeNs": 0,
      "realtimeThreadCpuTimeNs": 0,
      "offlineSystemActivitiesCpuTimeNs": 0,
      "realtimeSystemActivitiesCpuTimeNs": 0,
      "offlineResponseSerializationCpuTimeNs": 0,
      "realtimeResponseSerializationCpuTimeNs": 0,
      "offlineTotalCpuTimeNs": 0,
      "realtimeTotalCpuTimeNs": 0,
      "brokerReduceTimeMs": 0,
      "segmentStatistics": [],
      "traceInfo": {},
      "partialResult": false,
      "numSegmentsPrunedByBroker": 0,
      "numRowsResultSet": 10,
      "minConsumingFreshnessTimeMs": 1709241001083,
      "numSegmentsPrunedByServer": 25,
      "numSegmentsPrunedInvalid": 0,
      "numSegmentsPrunedByLimit": 25,
      "numSegmentsPrunedByValue": 0,
      "explainPlanNumEmptyFilterSegments": 0,
      "explainPlanNumMatchAllFilterSegments": 0
    }
    1. Why are number of docs scanned only 30 as I haven't added any filter to the query. Is it because the records are flushed to disk and hence not queried? 2. I am seeing that some of records are stucked in kinesis and are not being read, It only reads when I restart the pinot-server and after sometime it again stops reading messages. How can I debug this? I am seeing iterator age increase in kinesis.
    k
    • 2
    • 2