I am interested in getting latest updates on kines...
# general
a
I am interested in getting latest updates on kinesis-integration.This issue https://github.com/apache/incubator-pinot/issues/5648 mentions joining #kinesis-integration but I dont see the channel here in slack.Can someone point me to the right place to get more details ?
m
I believe this is available now, cc @User @User
n
Yes it is. You'll find a page about it in the docs. It's a very new feature, so you might face some hiccups along the way. Would be great if you can try it out
a
yes I did try it out @User But I was not able to consume any messages from a kinesis stream after creating a schema and a table. I am trying to implement this usecase https://docs.pinot.apache.org/basics/data-import/pinot-stream-ingestion I am new here to Pinot so wanted to see anyone has reported any issues
r
Hi Abhijeet, I was able to connect to Kinesis. One extra step I needed to do, was grant permissions to EKS to read the stream. (if you are using EKS)
n
Thanks @User. In addition to trying that, can share your table config here @User ? Have you set these properties: https://docs.pinot.apache.org/basics/data-import/pinot-stream-ingestion/amazon-kinesis. And any exceptions in the logs?
a
I tried ingesting to a local pinot instance using 1 hour aws sts credentials and token.I was able to connect to kinesis but when I added some records to kinesis I queried them through the Pinot console I did not get any records.This is the table config.
Copy code
{
  "tableName": "transcript",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "timeColumnName": "timestampInEpoch",
    "timeType": "MILLISECONDS",
    "schemaName": "transcript",
    "replicasPerPartition": "1"
  },
  "tenants": {},
  "tableIndexConfig": {
    "loadMode": "MMAP",
    "streamConfigs": {
      "streamType": "kinesis",
      "stream.kinesis.topic.name": "cdp_metric_events_poc",
      "region": "us-east-1",
      "endpoint": "<https://kinesis.us-east-1.amazonaws.com>",
      "shardIteratorType": "AFTER_SEQUENCE_NUMBER",
      "stream.kinesis.consumer.type": "lowlevel",
      "stream.kinesis.fetch.timeout.millis": "30000",
      "stream.kinesis.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
      "stream.kinesis.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory",
      "realtime.segment.flush.threshold.size": "1000000",
      "realtime.segment.flush.threshold.time": "1s"
    }
  },
  "metadata": {
    "customConfigs": {}
  }
}
No exception in the logs
Ok I changed the realtime.segment.flush.threshold.time to 1000 and it did consume records
@User you talked about facing some hiccups.Wanted to know if we can use this for implementing a feature in our prod env
m
@User yes please, we are here to help to get you there
a
thanks @User
n
Typically the time should be set to something like 2h or 6h depending on the rate of ingestion. Otherwise you will end up with too many small Pinot segment
a
I see so we do have an SLA 2 hours will slow.Max we can go to is 15 min
Is there a way to address the small segment issue by having a near realtime ingestion followed by a nightly batch ingestion which might compact these segments ?
n
the 2h or 6h will not affect the freshness of the data. you will still be able to query the records as soon as they are ingested
this setting only determines at what cadence the in-memory consumed events are converted to a Pinot segment on disk
a
ok i see so the records will be consumed realtime but they will be pushed to disc in 6h
n
yes
this video might help with some realtime ingestion general concepts:

https://youtu.be/WoruCQgPhSA

a
Thanks will go through it
Hi I am having the same problem again where the records are not being read from kinesis.This time I am trying upsert feature with Kinesis.https://docs.pinot.apache.org/basics/data-import/upsert.I dont see any errors in the console. When I try to query I dont see any records Schema
Copy code
{
  "schemaName": "transcript",
  "dimensionFieldSpecs": [
    {
      "name": "studentID",
      "dataType": "INT"
    },
    {
      "name": "firstName",
      "dataType": "STRING"
    },
    {
      "name": "lastName",
      "dataType": "STRING"
    },
    {
      "name": "gender",
      "dataType": "STRING"
    },
    {
      "name": "subject",
      "dataType": "STRING"
    }
  ],
  "primaryKeyColumns": [
    "studentID",
    "subject"
  ],
  "metricFieldSpecs": [
    {
      "name": "score",
      "dataType": "FLOAT"
    }
  ],
  "dateTimeFieldSpecs": [
    {
      "name": "timestamp",
      "dataType": "LONG",
      "format": "1:MILLISECONDS:EPOCH",
      "granularity": "1:MILLISECONDS"
    }
  ]
}
Table Config
Copy code
{
  "tableName": "transcript",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "timeColumnName": "timestamp",
    "timeType": "MILLISECONDS",
    "schemaName": "transcript",
    "replicasPerPartition": "1"
  },
  "tenants": {},
  "tableIndexConfig": {
    "loadMode": "MMAP",
    "streamConfigs": {
      "streamType": "kinesis",
      "stream.kinesis.topic.name": "cdp_metric_events_poc",
      "region": "us-east-1",
      "shardIteratorType": "AFTER_SEQUENCE_NUMBER",
      "stream.kinesis.consumer.type": "lowlevel",
      "stream.kinesis.fetch.timeout.millis": "30000",
      "stream.kinesis.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
      "stream.kinesis.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory",
      "realtime.segment.flush.threshold.size": "1000000",
      "realtime.segment.flush.threshold.time": "21600" # Also tried 6h did not work
    }
  },
  "upsertConfig": {
    "mode": "FULL"
  },
  "routing": {
    "instanceSelectorType": "strictReplicaGroup"
  },
  "metadata": {
    "customConfigs": {}
  }
}
@User
n
Was it working before setting config for upsert?
a
No it has only worked once when I set to realtime.segment.flush.threshold.size to “1000” but since you mentioned that “6h” is the right value otherwise it will create small segments
so I tried to “6h” and it did not work after that when I switched back to 1000 it did not work that time as well
Ok it work now but I am still having duplicates But I this is an incorrect config since the document also mentions that the segmentAssigmentStrategy has to be partitioned-replica-group-instance-assignment and not BalanceNumSegmentAssignmentStrategy.Although there was no error when I created this
Copy code
"segmentsConfig": {
 "timeColumnName": "mtime",
 "timeType": "MILLISECONDS",
 "retentionTimeUnit": "DAYS",
 "retentionTimeValue": "1",
 "segmentPushType": "APPEND",
 "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
 "schemaName": "transcript",
 "replicasPerPartition": "1"
 }
n
does your stream have events coming in? are they past the retention of the stream?
@User can you take a look too? i can continue looking later today
a
I am doing a POC so I send the events on an adhoc basis
n
btw you dont need to set segmentAssignmentStrategy at all
are there any exceptions in the server/controller log?
and could you elaborate on what you mean by it’s not working? the number of records in your stream doesn’t match what’s in your pinot table?
a
Hi Neha you are right I removed BalanceNumSegmentAssignmentStrategy and it is working.But what I observed is that when I create a table at first with BalanceNumSegmentAssignmentStrategy and then make a change later on it did not work directly When I tore the cluster and recreated another one from scratch without BalanceNumSegmentAssignmentStrategy it worked And there were no exceptions which is why it was hard to debug the issue
m
Thanks @User for finding that out. Do you mind documenting this in a github issue?
It might be expected behavior, so either we will fix it or make it more clear in the documentation.
a
Ok I will keep track of the issues I have seen and will log github issues
@User I have question on deduplication.I have used the streaming upsert feature and saw that the records are indeed deduped .Now I was trying to create offline segments from s3 json files which are in present in the following locations s3://test_bucket/pinot_poc/2021/07/26/00, s3://test_bucket/pinot_poc/2021/07/26/01, s3://test_bucket/pinot_poc/2021/07/27/00, s3://test_bucket/pinot_poc/2021/07/27/01, s3://test_bucket/pinot_poc/2021/07/28/00, s3://test_bucket/pinot_poc/2021/07/28/01 and so on.There are duplicate events present across each folder.I have created segments using https://docs.pinot.apache.org/basics/data-import/batch-ingestion/spark by specifying s3://test_bucket/pinot_poc as the input file I have 3 questions • partitioning - The segment tar files mirrored the same s3://test_bucket/pinot_poc/2021/07/26/00, (at hour granularity) whereas I want the option of 1 tar file per day not hour (or even 1 segment for the entire batch).Can I achieve that via configuration • Duplicates - When I queried the OFFLINE table via console I still got duplicates.Is there a way to eliminate that ? (REALTIME does eliminate duplicates) • Is there a way to compact the real-time segment files into offline (so that hybrid table manages these segment creation behind the scenes from realtime just on the basis of configuration which will make this very seamless
@User
m
Right now one input file becomes one segment, we are working on making it more flexible. You will need to eliminate duplicates in your offline job, before generating Pinot segments. Yes checkout the managed offline flow doc that uses minion
a
thanks for your reply....will look at Minion
n
Hi Abhijeet, just checking if you were able to move ahead? This is the doc that Mayank is referring to for moving real-time data to offline tables automatically https://docs.pinot.apache.org/operators/operating-pinot/pinot-managed-offline-flows
a
Yes I did find that document.I will let you know if I run into any issues
Hi Neha I did implement Managed Offline flow but I am not able to see any records when I query the OFFLINE table? Is there a way to find out from the console or logs whether the RealtimeToOfflineSegmentsTask has run ?
n
you should see some logs in controller, about the task being scheduled. and then some logs in minion, about the task being executed
you can also use the Task section on Swagger APIs to check if any tasks were scheduled
can you see segments in the offline table?
a
No I dont see any segments in the offline table.I will query Swagger for any scheduled tasks
n
whats the config you set in the realtime table for this?
a
Realtime table
Copy code
{
  "tableName": "transcript",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "timeColumnName": "timestamp",
    "timeType": "MILLISECONDS",
    "schemaName": "transcript",
    "replicasPerPartition": "1",
    "retentionTimeUnit": "DAYS",
    "retentionTimeValue": "2",
    "segmentPushType": "APPEND"
  },
  "tenants": {},
  "tableIndexConfig": {
    "loadMode": "MMAP",
    "streamConfigs": {
      "streamType": "kinesis",
      "stream.kinesis.topic.name": "cdp_metric_events_poc",
      "region": "us-east-1",
      "shardIteratorType": "LATEST",
      "stream.kinesis.consumer.type": "lowlevel",
      "stream.kinesis.fetch.timeout.millis": "30000",
      "stream.kinesis.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
      "stream.kinesis.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory",
      "realtime.segment.flush.threshold.size": "1000000",
      "realtime.segment.flush.threshold.time": "6h"
    }
  },
  "upsertConfig": {
    "mode": "FULL"
  },
  "routing": {
    "instanceSelectorType": "strictReplicaGroup"
  },
  "metadata": {
    "customConfigs": {}
  },
  "task": {
    "taskTypeConfigsMap": {
      "RealtimeToOfflineSegmentsTask": {
        "bucketTimePeriod": "1d",
        "bufferTimePeriod": "1h",
        "roundBucketTimePeriod": "1h",
        "mergeType": "rollup",
        "score.aggregationType": "max",
        "maxNumRecordsPerSegment": "100000"
      }
    }
  }
}
Offline
Copy code
{
  "tableName": "transcript",
  "tableType": "OFFLINE",
  "segmentsConfig": {
    "replication": 1,
    "timeColumnName": "timestamp",
    "timeType": "MILLISECONDS",
    "retentionTimeUnit": "DAYS",
    "retentionTimeValue": 365
  },
  "tenants": {
    "broker": "DefaultTenant",
    "server": "DefaultTenant"
  },
  "tableIndexConfig": {
    "loadMode": "MMAP"
  },
  "metadata": {}
}
these are the configs
I also queried from swagger
Copy code
tasks/scheduler/jobDetails?tableName=transcript_REALTIME&taskType=RealtimeToOfflineSegmentsTask
Copy code
{
  "code": 404,
  "error": "Unable to find job detail for table name - transcript_REALTIME, task type - RealtimeToOfflineSegmentsTask"
}
n
when did you set this up? Maybe you don’t have any segments older than 1 day timestamp?
a
Actually controller does not have this property “controller.task.frequencyInSeconds”: 3600
So the task manager is not enabled
👍 1
Step 3 of the document
@User I do see the task now from swagger but the state is NOT_STARTED. I created the table yesterday and there are records which I inserted before 10am.There are no segments yet in the offline table
Copy code
tasks/RealtimeToOfflineSegmentsTask/state
Config is the same that I messaged last Friday
n
are minions up and healthy?
can you share logs from controller and then minion, regarding this task?
a
I will check
status says alive
n
did you find the log statements about task creation on controller? and possibly about task execution/failure on minion?
a
On minion these are the only logs (yesterdays)
Copy code
2021/08/08 13:36:04.468 INFO [ParticipantHealthReportTask] [Start a Pinot [MINION]] Start HealthCheckInfoReportingTask
2021/08/08 13:36:04.468 INFO [CallbackHandler] [Start a Pinot [MINION]] initializing CallbackHandler: org.apache.helix.manager.zk.CallbackHandler@713b237a content: CallbackHandler{_watchChild=false, _preFetchEnabled=false, _batchModeEnabled=false, _path='/pinot-quickstart/INSTANCES/Minion_pinot-minion-0.pinot-minion-headless.default.svc.cluster.local_9514/MESSAGES', _listener=org.apache.helix.messaging.handling.HelixTaskExecutor@1ae627f3, _changeType=MESSAGE, _manager=org.apache.helix.manager.zk.ZKHelixManager@101067e8, _zkClient=org.apache.helix.manager.zk.ZkClient@
I also see this on the minion
Copy code
[Start a Pinot [MINION]] Initialized TaskExecutorFactoryRegistry with 5 task executor factories: [MergeRollupTask, RealtimeToOfflineSegmentsTask, ConvertToRawIndexTask, PurgeTask, SegmentGenerationAndPushTask]
Copy code
2021/08/08 13:35:28.072 INFO [TaskFactoryRegistry] [Start a Pinot [MINION]] Registering RealtimeToOfflineSegmentsTask with task executor factory: RealtimeToOfflineSegmentsTaskExecutorFactory, event observer factory: DefaultMinionEventObserverFactory
2021/08/08 13:35:28.072 INFO [TaskFactoryRegistry] [Start a Pinot [MINION]] Registering ConvertToRawIndexTask with task executor factory: ConvertToRawIndexTaskExecutorFactory, event observer factory: DefaultMinionEventObserverFactory
2021/08/08 13:35:28.072 INFO [TaskFactoryRegistry] [Start a Pinot [MINION]] Registering PurgeTask with task executor factory: PurgeTaskExecutorFactory, event observer factory: DefaultMinionEventObserverFactory
2021/08/08 13:35:28.072 INFO [TaskFactoryRegistry] [Start a Pinot [MINION]] Registering SegmentGenerationAndPushTask with task executor factory: SegmentGenerationAndPushTaskExecutorFactory, event observer factory: DefaultMinionEventObserverFactory
Controller logs. This says Window data overflows into consuming segments.
Copy code
2021/08/09 14:40:42.266 INFO [PeriodicTaskScheduler] [pool-7-thread-9] Starting PinotTaskManager with running frequency of 3600 seconds.
2021/08/09 14:40:42.267 INFO [BasePeriodicTask] [pool-7-thread-9] Start running task: PinotTaskManager
2021/08/09 14:40:42.276 INFO [PinotTaskManager] [pool-7-thread-9] Trying to schedule task type: RealtimeToOfflineSegmentsTask, isLeader: true
2021/08/09 14:40:42.276 INFO [RealtimeToOfflineSegmentsTaskGenerator] [pool-7-thread-9] Start generating task configs for table: transcript_REALTIME for task: RealtimeToOfflineSegmentsTask
2021/08/09 14:40:42.283 INFO [RealtimeToOfflineSegmentsTaskGenerator] [pool-7-thread-9] Window data overflows into CONSUMING segments for partition of segment: transcript__0__1__20210805T2101Z. Skipping task generation: RealtimeToOfflineSegmentsTask
2021/08/09 14:40:42.283 INFO [RealtimeToOfflineSegmentsTaskGenerator] [pool-7-thread-9] Found no eligible segments for task: RealtimeToOfflineSegmentsTask with window [1628380800000 - 1628467200000). Skipping task generation
2021/08/09 14:40:42.283 INFO [PinotTaskManager] [pool-7-thread-9] No task to schedule for task type: RealtimeToOfflineSegmentsTask
2021/08/09 14:40:42.283 INFO [BasePeriodicTask] [pool-7-thread-9] Finish running task: PinotTaskManager in 16ms
2021/08/09 14:41:00.445 INFO [PeriodicTaskScheduler] [pool-7-thread-1] Starting SegmentStatusChecker with running frequency of 300 seconds.
2021/08/09 14:41:00.445 INFO [BasePeriodicTask] [pool-7-thread-1] Start running task: SegmentStatusChecker
2021/08/09 14:41:00.445 INFO [ControllerPeriodicTask] [pool-7-thread-1] Processing 2 tables in task: SegmentStatusChecker
2021/08/09 14:41:00.454 INFO [ControllerPeriodicTask] [pool-7-thread-1] Finish processing 2/2 tables in task: SegmentStatusChecker
2021/08/09 14:41:00.454 INFO [BasePeriodicTask] [pool-7-thread-1] Finish running task: SegmentStatusChecker in 9ms
The last record I created yesterday was Sunday, August 8, 2021 21107 PM GMT
Same controller logs at 2021/08/09 164100.695 UDT and 2021/08/09 174042.331 UDT
What is
Window data overflows into CONSUMING segments
The realtime table has the following config
Copy code
"segmentsConfig": {
      "timeType": "MILLISECONDS",
      "schemaName": "transcript",
      "retentionTimeUnit": "DAYS",
      "retentionTimeValue": "2",
      "timeColumnName": "timestamp",
      "segmentPushType": "APPEND",
      "replicasPerPartition": "1"
    },
n
it means your data is still in CONSUMING segments. It cannot be moved unless the segment becomes a completed segment
some more about the behavior here

https://www.youtube.com/watch?v=V_KNUUrS6DA&amp;t=1977s

a
thanks will go through
n
can you share the table External View (from ZK Browser)
a
Copy code
{
  "id": "transcript_REALTIME",
  "simpleFields": {
    "BATCH_MESSAGE_MODE": "false",
    "BUCKET_SIZE": "0",
    "IDEAL_STATE_MODE": "CUSTOMIZED",
    "INSTANCE_GROUP_TAG": "transcript_REALTIME",
    "MAX_PARTITIONS_PER_INSTANCE": "1",
    "NUM_PARTITIONS": "2",
    "REBALANCE_MODE": "CUSTOMIZED",
    "REPLICAS": "1",
    "STATE_MODEL_DEF_REF": "SegmentOnlineOfflineStateModel",
    "STATE_MODEL_FACTORY_NAME": "DEFAULT"
  },
  "mapFields": {
    "transcript__0__1__20210805T2101Z": {
      "Server_pinot-server-0.pinot-server-headless.default.svc.cluster.local_8098": "ONLINE"
    },
    "transcript__0__2__20210808T1446Z": {
      "Server_pinot-server-0.pinot-server-headless.default.svc.cluster.local_8098": "CONSUMING"
    }
  },
  "listFields": {}
}
n
and
select $segmentName, min(\"timestamp\"), max(\"timestamp\") from transcript_REALTIME group by $segmentName
can you run this? This will tell you the time ranges in your segments. You have set bucketTime=1d. So the job will pick the least time and do least time + 1 day, as the bucket. If this bucket overlaps with the CONSUMING segment, that means it doesnt not have complete data for the bucket, and it will wait
my hunch is that your [minTime, minTime + 1d] bucket is partly in the CONSUMING segment
a
where do u want me to run this query ? I dont see any output select $segmentName, min(timeColumn), max(timeColumn) from transcript_REALTIME group by $segmentName
n
you have to set the right time column name in the query
select $segmentName, min(\“timestamp\“), max(\“timestamp\“) from transcript_REALTIME group by $segmentName
a
transcript__0__1__20210805T2101Z 1628431856000 1628431867000
This is the only value that I get for this query ...since I am the only sending events I have not sent any events since 10Am EST yesterday
1628431867000 => Sunday, August 8, 2021 101107 AM GMT-04:00 DST
n
hmm, the way this works is that it will find min time in your table, which is
1628431856000
, and because you’ve set bucketTime=1d, the window will be [1628431856000, 1628431856000+1d]. It will find all completed segments which have data in this window. In your case this is transcript__0__1. But even the CONSUMING segment has data for this window. So we dont have all segments for this window at this time. https://docs.pinot.apache.org/operators/operating-pinot/pinot-managed-offline-flows#how-this-works
if this is just for test purposes, you can send events with timestamp across several days. so send events for 5th Aug, 6th, 7th, 8th, 9th. Set the realtime table flush threshold to something low, so segments complete often. Then it should work
a
Yes this was for test purposes but I also wanted to understand how this works … by flush threshold is 6hours
so u mean this segment will end 1d + 6 hours i.e bucketTime + flush threshold?
n
Your segment will complete every 6h
Unless you don't publish events, then it will be open for longer
a
hmm so then why is it still consuming
n
No events published to that segment?
a
ohh i see so i send an event now with a different timestamp it will immediately create a segment
no I published events in the 5 min range yesterday it has been 30+ hours now it is still consuming
n
Can you check if your consuming segment has any events? You can run a "select count * from table" query to see total docs, and then select count*, $segmentName from table group by $segmentName" to see how many in the completed segment. If they match, then the segment is open because no events are published
a
12 transcript__0__1__20210805T2101Z
total 12 events that i ingested yesterday
n
then it adds up right. unless you publish more events, the consuming segment cannot complete (cannot complete the empty segment). So publish some with current timestamp, then retrigger the realtimeToOffline job
a
Ok I just sent an event but I will wait for it to be auto generated.But i am curious I read in this blog that Realtime segments are not time bound https://medium.com/@surajkmth29/apache-pinot-tables-and-segments-a72dc5854876 so why do they need terminal events to flush the segments
n
it’s an optimization, so that we dont keep creating empty segments. There is no way for the consumers to know that no more events are expected. In a practical setup, you’re not going to have a stream with just a couple of events here and there, you would typically have a steady stream of events and there wouldn’t be a stream that is no longer going to see events (unless merged/split of course)
a
Ok makes sense
OFFLINE segment was created successfully thanks. 1 more config related question.Does the retention period in the REALTIME has to bigger than or equal to Bucket period + flush time?
n
bucket + buffer period + flush time. You should set buffer time based on how late you expect events for a given time frame. For example, if your events for hour 100000 can come up to 120000, then bufferTime can be as low as 2h.
a
Thanks I kept the buffer time to 1d now. I am persisting records from kinesis .These are records from last month that I have sending to Kinesis with 2 shards.I am not seeing any records being ingested.I verified that via querying the REALTIME table from the console When i looked at the pinot server logs I see the below exception .
Copy code
INFO [LLRealtimeSegmentDataManager_metrics__1__0__20210810T1937Z] [metrics__1__0__20210810T1937Z] Consumed 0 events from (rate:0.0/s), currentOffset={"shardId-000000000001":"49620943664941832421991682937699078913900748813098811410"}, numRowsConsumedSoFar=142679, numRowsIndexedSoFar=0
2021/08/10 20:25:18.748 WARN [KinesisConsumer] [pool-2263-thread-1]
Looks like the problem was incorrect field definition in the schema.I have corrected the schema but I still see the same error.Do I need to delete all tables and start from scratch ?
n
that is not really an error, just a periodic log about consumption status. Not sure why not consuming, it looks healthy. Any error messages/exceptions in the log?
a
It was not consuming because of a Datatype conversion error (I had a field defined as Long whereas it was a string.I did modify the schema but still I am having the same conversion error.Should I delete the tables and create from scratch
I just deleted the tables and I am now able to query the records
I would like to ask about the transcript table (from the Apache Pinot tutorials) The offline tables were successfully created but
select count(*) from transcript_OFFLINE
I get
12
(Because the offline segment has not been generated I will have to send a terminal event for the segment) But the confusing part is when I query
select count(*) from transcript
I get
1
as the output and not
13
Assumption is that simply running the query against transcript will combine the output of both REALTIME and OFFLINE but apparently I am only seeing
transcript_REALTIME
output
n
Results are not combined. Take a look at this https://docs.pinot.apache.org/basics/components/broker
a
It says they are merged Am I misreading it
Copy code
In case of hybrid tables, the brokers ensure that the overlap between realtime and offline segment data is queried exactly once, by performing offline and realtime federation
"""""
The broker then merges results from both these queries before returning back to the client.
n
did you look at the explanation and diagram about the 2 queries that are constructed?
a
ohh so this is only for the overlap between realtime and offline so transcript is just an alias to REALTIME + overlap ?
n
yes, a query to transcript will fetch as much as it can from REALTIME, and whatever timestamp the realtime table stops at, the remaining will be taken from OFFLINE
a
No but that is not happening the offline has more records the definition is confusing
Do I need to query offline and online separately
The offline has 12 records but those are not displayed in transcript
Transcript only shows 1 record
n
number of records doesn’t matter, the 2 queries underneath are constructed based on the timestamp boundary
you might want to play with a more realistic dataset, if you want to test out the correctness
generally, in a production setup, you will have a continual timeline in the offline table, say
day 1, day 2, day 3, day 4, day 5….day 20
. And the realtime table will have only few days, say,
day 18, day 19, day 20, day 21
. In such a case, you dont want to be counting
day 18, day 19 and day 20
twice. So we will look at the latest timestamp in offline (in this case day 20), and a query like
select * from table
will become,
select * from table_OFFLINE where timestamp < day 20
and
select * from table_REALTIME where timestamp >= day 20
a
yeah this is what I also expected ..there is 1 day gap between offline and realtime timestamp
n
now depending on what this timestamp comes to, in your test data, these 2 queries will be constructed. you can run some simple queries to verify.
select timestamp from transcript_OFFLINE
and
select timestamp from transcript_REALTIME
, and then observe broker logs when you run
select * from transcript
a
Thanks a lot for your replies so far.I appreciate all your feedback.I will certainly look at the pinot broker logs to get more details. This is based on the understanding I had about how this works underneath. The min(timestamp) is start time of the REALTIME segment.Based on the below 2 queries I expected 13 as the count for
select count(*) FROM transcript
but I see 1
Copy code
select count(*) FROM transcript
count(*)
1

select $segmentName, min("timestamp"), max("timestamp") from transcript_REALTIME group by $segmentName
$segmentName	                    min(timestamp)	max(timestamp)
transcript__0__2__20210808T1446Z	1628552862000	1628552862000

select count(*) FROM transcript_REALTIME WHERE "timestamp" >= 1628552862000
count(*)
1

select count(*) FROM transcript_OFFLINE WHERE "timestamp" < 1628552862000
count(*)
12
n
what is the result of
select $segmentName, min("timestamp"), max("timestamp") from transcript_OFFLINE group by $segmentName
?
a
It is
Copy code
transcript_1628431200000_1628431200000_0	1628431200000	1628431200000
n
and what are the 2 queries generated in the broker log?
also, what is it that you are trying to verify achieve? transcript/test data is only good for getting started. For any serious setups, benchmarking, it is better to take a realistic dataset, what your usecase will use
as for this setup, the problem is because you have just 1 timestamp in the entire segment. The time boundary is calculated using the max time in OFFLINE, which in this case is
1628431200000
. So the 2 queries are going to be
select * from table_OFFLINE where timestamp <= (1628431200000 - 1d)
and
select * from table_REALTIME where timestamp > (1628431200000 - 1d)
. Refer to example above and also that doc picture, which both show that the latest day is always favored from realtime, because you might still be receiving events for that day. the 1d is determined by the segmentPushFrequency setting in the table config (DAILY/HOURLY).
a
Thanks for your response and your clarification about how this works I dont see any queries in the broker logs Regarding your question the production data will not be of this nature but QA data we have is unfortunately following this pattern i.e ..we will run a tests for an hour and then for day there will be no events generated.This is the reason why I wanted to understand how this works.The logic in determining which timestamp to pick while running the query is good insight if this situation is seen in QA environment. One way to solve this might be schedule realtimeTooffline tasks at the end of 1 week or after couple of days in the lower environments so that output is retained for a longer duration
n
certanly, you can do that
a
@User I am still having issues with creating offline segments via the managed offline flow.I created a generator which creates 30 events every 1 hour and I also see total 10 segments but none have been created to offline.I see the below logs in the Controller .There is nothing in Minion logs (no entry since August 10)
Copy code
2021/08/17 19:41:05.170 INFO [RealtimeToOfflineSegmentsTaskGenerator] [pool-7-thread-3] Partitions: [0] have no completed segments. Table: metrics_REALTIME is not ready for RealtimeToOfflineSegmentsTask. Skipping task generation.
2021/08/17 20:41:05.190 INFO [RealtimeToOfflineSegmentsTaskGenerator] [pool-7-thread-4] Start generating task configs for table: metrics_REALTIME for task: RealtimeToOfflineSegmentsTask
2021/08/17 20:41:05.204 INFO [RealtimeToOfflineSegmentsTaskGenerator] [pool-7-thread-4] Partitions: [0] have no completed segments. Table: metrics_REALTIME is not ready for RealtimeToOfflineSegmentsTask. Skipping task generation.
External view in the Zookeeper browser has this info
Copy code
"metrics__0__0__20210813T1710Z": {
      "Server_pinot-server-0.pinot-server-headless.default.svc.cluster.local_8098": "CONSUMING"
    },
    "metrics__1__10__20210816T0511Z": {
      "Server_pinot-server-0.pinot-server-headless.default.svc.cluster.local_8098": "ONLINE"
    },
    "metrics__1__11__20210816T1112Z": {
      "Server_pinot-server-0.pinot-server-headless.default.svc.cluster.local_8098": "ONLINE"
    },
    "metrics__1__12__20210816T1712Z": {
      "Server_pinot-server-0.pinot-server-headless.default.svc.cluster.local_8098": "ONLINE"
    },
    "metrics__1__13__20210816T2313Z": {
      "Server_pinot-server-0.pinot-server-headless.default.svc.cluster.local_8098": "ONLINE"
    },
    "metrics__1__14__20210817T0513Z": {
      "Server_pinot-server-0.pinot-server-headless.default.svc.cluster.local_8098": "ONLINE"
    },
    "metrics__1__15__20210817T1113Z": {
      "Server_pinot-server-0.pinot-server-headless.default.svc.cluster.local_8098": "ONLINE"
    },
    "metrics__1__16__20210817T1713Z": {
      "Server_pinot-server-0.pinot-server-headless.default.svc.cluster.local_8098": "CONSUMING"
    },
    "metrics__1__8__20210815T1711Z": {
      "Server_pinot-server-0.pinot-server-headless.default.svc.cluster.local_8098": "ONLINE"
    },
    "metrics__1__9__20210815T2311Z": {
      "Server_pinot-server-0.pinot-server-headless.default.svc.cluster.local_8098": "ONLINE"
    }
Any ideas ?
n
you can see the log says
Copy code
Partitions: [0] have no completed segments. Table: metrics_REALTIME is not ready for RealtimeToOfflineSegmentsTask. Skipping task generation.
why arent any events going into partition 0?
a
Yeah that's the issue how do I solve it ...my kinesis has 2 shards .. ok I the problem might be randomization of accounts but we still have 2 shards in kinesis
In kinesis
n
2 shards is not the problem. partition 0 doesnt seem to move forward at all?
a
So I heard in YouTube video that each shard has 1 partition ... that's how a realtime table has to work am I right
I actually don't know why it is not going forward are there any debugging tips ?
n
you can look at the server logs to see if there’s any issues with segment completion for that CONSUMING segment
another possibility is that you event generation job is not sending events uniformly to the 2 shards
a
Hmm I will verify if all shards are getting records
Hi Neha there was a partiton key issue due to which the shards were unbalanced.After fixing that issue I can see the the RealtimeToOffline is not working for metrics.Thanks again for your quick response
n
did you mean to say it is now working? 🙂 or is it again not working? 😞
a
it is working 🙂
👌 1
Curious is there a document on different segment states like online, consuming
n
i dont think there is a document, but you can look at this https://github.com/apache/pinot/blob/master/pinot-server/src/main/java/org/apache/[…]server/starter/helix/SegmentOnlineOfflineStateModelFactory.java Here you will find all the states and the transitions
a
thanks
@User I wanted some advice on how to implement this feature.We store workflow metadata in pinot .The workflow states change and every state is captured with a separate timestamp.I am including the timestamp as a part of the key for upserts as we are planning separate usecases for transitions.But we also want to query the the count of workflows in a specific task state but that we would need to rank each event by the timestamp and then get a count of all the latest events.I looked at many functions on this page but none of them seem to work https://docs.pinot.apache.org/users/user-guide-query/supported-aggregations This is how the sample data looks like
Copy code
account_id      type    task_name    task_state timestamp       workflow_id  
7119049663464	task	test_task_0	running	    1630361707931	f4b91f52-16f2-4a2c-8865-bf5039ac4643
7119049663464	task	test_task_2	completed	1630361712931	f4b91f52-16f2-4a2c-8865-bf5039ac4643
7119049663464	task	test_task_1	skipped	    1630361717931	f4b91f52-16f2-4a2c-8865-bf5039ac4643
7119049663464	task	test_task_0	timeout	    1630361722931	f4b91f52-16f2-4a2c-8865-bf5039ac4643
7119049663464	task	test_task_2	failed	    1630361727931	f4b91f52-16f2-4a2c-8865-bf5039ac4643
expected count for each task
Copy code
test_task_0  1
test_task_1  1
test_task_2  1
but we get
Copy code
test_task_1  1
test_task_2  1
test_task_0  1
test_task_1  1
test_task_2  1
Is there a way we can preserve the state transition as well consider only the latest event while running the count query
m
Right now there isn't, but this might help when available: https://github.com/apache/pinot/issues/7315
a
Thanks is a expected release timeline for this ?
m
If someone picks it up, then may be the next release, say a couple of months
👍 1
a
@User Thanks for your support in answering all the questionWe are planning a pilot release soon by handling the latest event case using Upsert.I had some questions about data migration.Since this is pilot we are not aware of the exact volume and frequency of events.It is possible we will increase the kinesis shards later on which is not supported by the version of Pinot we are using due to which we might need to recreate new tables. Wanted to know whether 3 things 1. How can we migrate the data from the old table ? We do a copy of all events in landing bucket.Can we write a spark job to create segments in the offline table.We are using the managed offline flow so will that be supported ? 2. We have created all tables into the default tenant.In case we switch tenants will the data be automatically moved to the new tenants or do we need to migrate data using spark ? 3. Is there a way to take a dump of the data stored in Pinot ?
n
kinesis shards increase should be supported. I suggest you test that case out in your current setup first, since Kinesis is a new feature
a
Yeah I tested it and I did not the number of segments increase …but I would still thing migration data is a good feature … Can you let me know how to do point 1 to 3…as we not planning to change shards
n
1. migrate data from 1 table to another is not supported out of the box. You could download the segments, and then simply upload them back to the new table, using a data push command https://docs.pinot.apache.org/configuration-reference/job-specification#push-job-spec. If the 2 tables have the same name, then this will work right away. If they have different names, you will have to change the name inside the segments first, again manually. 2. switching tenants is easy, you update the tenants and then call a rebalance: https://docs.pinot.apache.org/basics/getting-started/frequent-questions/operations-faq#how-to-run-a-rebalance-on-a-table 3. no way currently to take a dump of the data @User perhaps we need to create some issues for some of these features. I’ve seen them being requested more than once. Thoughts?
the increase in number of segments will not be immediate. First the shard that was split should finish being completely consumed, only then will the new shards be picked up by Pinot. This is designed as such to ensure we don’t ingest newer events before older ones
a
Ok we are using version 0.80 is that supported there as I remember seeing an open issue when I first started to work
Also thanks for answers one final question regarding the same topic...if I am using managed offline and find I need more compacted segments so using spark to create them will work right regardless of the fact that this is managed offline..I haven't tried it but I read somewhere that back fill is not supported that's why I am asking
n
it is supported in 0.8.0 afai remember. it’s just not immediate
a
That's great I will test it out
n
for creating more compacted segments 1. in the managed flows you can increase the number of rows per segment btw 2. Yes you can upload to the offline table using spark jobs. But you need to delete the segments you are tryingt o replace 3. @User is merge feature ready for use?
j
Merge feature is available in the current master, but not in release 0.8.0
We have tested it in the staging environment, and will post the documentation soon
a
Thanks I will look at the https://docs.pinot.apache.org/configuration-reference/job-specification#push-job-spec/ .Regarding the maxNumRecordsPerSegment which allows me to increase of rows I see this in the description
Control the *number of records you want in a segment generated*. Useful if the time window has many records, but you don't want them all in the same segment.
This means that it is time window or max number of rows which ever happens first.I was referring to compacting across time windows which u answered I will hae to delete the segments first before replacing
n
can you give me an example? mainly 1. what is the bucketTime set in RealtimeToOffline? what window are the resulting segments having? 2. what is the time window that you want to make it after compacting?
a
Sorry @User for the delayed response. Firstly thanks for all your responses.This has really helped in moving the work ahead.This is the task config.Currently for our pilot we have decided to start with a small load.So we will definitely have a higher load so it is not possible to predict what would be the right settings. 1. “task”: { 2. “taskTypeConfigsMap”: { 3. “RealtimeToOfflineSegmentsTask”: { 4. “bucketTimePeriod”: “1d”, 5. “bufferTimePeriod”: “1d”, 6. “roundBucketTimePeriod”: “1h”, 7. “mergeType”: “rollup”, 8. “score.aggregationType”: “max”, 9. “maxNumRecordsPerSegment”: “100000” 10. } 11. } 12. } The time window for the compaction would be variable based on the size of segments but it is too soon to make that prediction. The reason why I was asking the questions now was to just get some idea whether certain things are not at possible or can we do these changes seamlessly when we determine the right settings
j
The
maxNumRecordsPerSegment
might be too small. I'd recommend at least 1M (it is 5M by default), or the task might generate too many segments per day
a
Thanks @User Can this value be changed after the table has been created ?
or is it only at the time of creation
j
You can change it and the next task scheduling will pick it up
a
alright thanks
@User We have made a push to prod and I wanted to some of the configs I create both Realtime and Offline tables.I have seen change of count (1515 to 1484) .This might related to the conversation we had about timestamp boundaries so I wanted to ensure I have the right config
Copy code
{
  "tableName": "workflowEvents",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "timeColumnName": "eventTimestamp",
    "timeType": "MILLISECONDS",
    "schemaName": "workflowEvents",
    "replicasPerPartition": "1",
    "retentionTimeUnit": "DAYS",
    "retentionTimeValue": "10",
    "segmentPushType": "APPEND"
  },
  "tenants": {
    "broker": "DefaultTenant",
    "server": "DefaultTenant"
  },
  "tableIndexConfig": {
    "loadMode": "MMAP",
    "streamConfigs": {
      "streamType": "kinesis",
      "stream.kinesis.topic.name": "qa-rel-metrics-stream",
      "region": "us-east-1",
      "shardIteratorType": "LATEST",
      "stream.kinesis.consumer.type": "lowlevel",
      "stream.kinesis.fetch.timeout.millis": "30000",
      "stream.kinesis.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
      "stream.kinesis.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory",
      "realtime.segment.flush.threshold.size": "1000000",
      "realtime.segment.flush.threshold.time": "6h"
    }
  },
  "upsertConfig": {
    "mode": "FULL"
  },
  "routing": {
    "instanceSelectorType": "strictReplicaGroup"
  },
  "metadata": {
    "customConfigs": {}
  },
  "task": {
    "taskTypeConfigsMap": {
      "RealtimeToOfflineSegmentsTask": {
        "bucketTimePeriod": "7d",
        "bufferTimePeriod": "1d",
        "roundBucketTimePeriod": "1h",
        "mergeType": "rollup",
        "score.aggregationType": "max",
        "maxNumRecordsPerSegment": "5000000"
      }
    }
  }
}
Copy code
{
  "tableName": "workflowEvents",
  "tableType": "OFFLINE",
  "segmentsConfig": {
    "replication": 1,
    "timeColumnName": "eventTimestamp",
    "timeType": "MILLISECONDS",
    "retentionTimeUnit": "DAYS",
    "retentionTimeValue": 365
  },
  "tenants": {
    "broker": "DefaultTenant",
    "server": "DefaultTenant"
  },
  "tableIndexConfig": {
    "loadMode": "MMAP"
  },
  "ingestionConfig": {
    "batchIngestionConfig": {
      "segmentIngestionType": "APPEND",
      "segmentIngestionFrequency": "DAILY"
    }
  },
  "metadata": {}
}
The table started having data on September 27 and the Offline table does not have any segments created yet due to the bufferTimePeriod as 1d
Firstly should the segmentIngestionFrequency be changed to WEEKLY ? Secondly I am trying to find reason for the count decrease
All the REALTIME segments have the status ONLINE in the zookeeper external view
n
it doesnt look right that the offline table doesnt have any data yet. 27th sept is 10 days ago. You might wantto check in the logs if the realtiemToOffline is failing for some reason
count reduction sounds like it’s happening because realtime segments got deleted due to retention. sept 27 is exactly 10 days ago
a
Actually i just verified the min time for the first segment was September 30 not Sept 27
so it still 7 days + 1 day i do get that message in the logs no data in window
n
whats the ideal state?
a
by ideal state do u mean expected response ?
n
ideal state of the table, from zookeeper
a
Copy code
{
  "id": "workflowEvents_REALTIME",
  "simpleFields": {
    "BATCH_MESSAGE_MODE": "false",
    "IDEAL_STATE_MODE": "CUSTOMIZED",
    "INSTANCE_GROUP_TAG": "workflowEvents_REALTIME",
    "MAX_PARTITIONS_PER_INSTANCE": "1",
    "NUM_PARTITIONS": "39",
    "REBALANCE_MODE": "CUSTOMIZED",
    "REPLICAS": "1",
    "STATE_MODEL_DEF_REF": "SegmentOnlineOfflineStateModel",
    "STATE_MODEL_FACTORY_NAME": "DEFAULT"
  },
  "mapFields": {
    "workflowEvents__0__0__20210927T1932Z": {
      "Server_cdp-dl-pinot-k8s-server-0.cdp-dl-pinot-k8s-server-headless.dp-metrics-pinot.svc.cluster.local_8098": "ONLINE"
    },
    "workflowEvents__0__10__20211004T1923Z": {
      "Server_cdp-dl-pinot-k8s-server-0.cdp-dl-pinot-k8s-server-headless.dp-metrics-pinot.svc.cluster.local_8098": "ONLINE"
    },
    "workflowEvents__0__11__20211005T0123Z": {
      "Server_cdp-dl-pinot-k8s-server-0.cdp-dl-pinot-k8s-server-headless.dp-metrics-pinot.svc.cluster.local_8098": "ONLINE"
    },
    "workflowEvents__0__12__20211005T1023Z": {
      "Server_cdp-dl-pinot-k8s-server-0.cdp-dl-pinot-k8s-server-headless.dp-metrics-pinot.svc.cluster.local_8098": "ONLINE"
    },
    "workflowEvents__0__13__20211005T1624Z": {
      "Server_cdp-dl-pinot-k8s-server-0.cdp-dl-pinot-k8s-server-headless.dp-metrics-pinot.svc.cluster.local_8098": "ONLINE"
    },
    "workflowEvents__0__14__20211005T2224Z": {
      "Server_cdp-dl-pinot-k8s-server-0.cdp-dl-pinot-k8s-server-headless.dp-metrics-pinot.svc.cluster.local_8098": "ONLINE"
    },
    "workflowEvents__0__15__20211006T0424Z": {
      "Server_cdp-dl-pinot-k8s-server-0.cdp-dl-pinot-k8s-server-headless.dp-metrics-pinot.svc.cluster.local_8098": "ONLINE"
    },
    "workflowEvents__0__16__20211006T1524Z": {
      "Server_cdp-dl-pinot-k8s-server-0.cdp-dl-pinot-k8s-server-headless.dp-metrics-pinot.svc.cluster.local_8098": "ONLINE"
    },
    "workflowEvents__0__17__20211006T2125Z": {
      "Server_cdp-dl-pinot-k8s-server-0.cdp-dl-pinot-k8s-server-headless.dp-metrics-pinot.svc.cluster.local_8098": "ONLINE"
    },
    "workflowEvents__0__18__20211007T0325Z": {
      "Server_cdp-dl-pinot-k8s-server-0.cdp-dl-pinot-k8s-server-headless.dp-metrics-pinot.svc.cluster.local_8098": "ONLINE"
    },
    "workflowEvents__0__19__20211007T1725Z": {
      "Server_cdp-dl-pinot-k8s-server-0.cdp-dl-pinot-k8s-server-headless.dp-metrics-pinot.svc.cluster.local_8098": "CONSUMING"
    },
    "workflowEvents__0__1__20210930T1921Z": {
      "Server_cdp-dl-pinot-k8s-server-0.cdp-dl-pinot-k8s-server-headless.dp-metrics-pinot.svc.cluster.local_8098": "ONLINE"
    },
    "workflowEvents__0__2__20211001T0121Z": {
      "Server_cdp-dl-pinot-k8s-server-0.cdp-dl-pinot-k8s-server-headless.dp-metrics-pinot.svc.cluster.local_8098": "ONLINE"
    },
    "workflowEvents__0__3__20211001T0722Z": {
      "Server_cdp-dl-pinot-k8s-server-0.cdp-dl-pinot-k8s-server-headless.dp-metrics-pinot.svc.cluster.local_8098": "ONLINE"
    },
    "workflowEvents__0__4__20211001T1322Z": {
      "Server_cdp-dl-pinot-k8s-server-0.cdp-dl-pinot-k8s-server-headless.dp-metrics-pinot.svc.cluster.local_8098": "ONLINE"
    },
    "workflowEvents__0__5__20211001T1922Z": {
      "Server_cdp-dl-pinot-k8s-server-0.cdp-dl-pinot-k8s-server-headless.dp-metrics-pinot.svc.cluster.local_8098": "ONLINE"
    },
    "workflowEvents__0__6__20211002T0122Z": {
      "Server_cdp-dl-pinot-k8s-server-0.cdp-dl-pinot-k8s-server-headless.dp-metrics-pinot.svc.cluster.local_8098": "ONLINE"
    },
    "workflowEvents__0__7__20211002T1122Z": {
      "Server_cdp-dl-pinot-k8s-server-0.cdp-dl-pinot-k8s-server-headless.dp-metrics-pinot.svc.cluster.local_8098": "ONLINE"
    },
    "workflowEvents__0__8__20211002T1722Z": {
      "Server_cdp-dl-pinot-k8s-server-0.cdp-dl-pinot-k8s-server-headless.dp-metrics-pinot.svc.cluster.local_8098": "ONLINE"
    },
    "workflowEvents__0__9__20211004T1323Z": {
      "Server_cdp-dl-pinot-k8s-server-0.cdp-dl-pinot-k8s-server-headless.dp-metrics-pinot.svc.cluster.local_8098": "ONLINE"
    },
    "workflowEvents__1__0__20210927T1932Z": {
      "Server_cdp-dl-pinot-k8s-server-0.cdp-dl-pinot-k8s-server-headless.dp-metrics-pinot.svc.cluster.local_8098": "ONLINE"
    },
    "workflowEvents__1__10__20211004T1923Z": {
      "Server_cdp-dl-pinot-k8s-server-0.cdp-dl-pinot-k8s-server-headless.dp-metrics-pinot.svc.cluster.local_8098": "ONLINE"
    },
    "workflowEvents__1__11__20211005T0123Z": {
      "Server_cdp-dl-pinot-k8s-server-0.cdp-dl-pinot-k8s-server-headless.dp-metrics-pinot.svc.cluster.local_8098": "ONLINE"
    },
    "workflowEvents__1__12__20211005T1424Z": {
      "Server_cdp-dl-pinot-k8s-server-0.cdp-dl-pinot-k8s-server-headless.dp-metrics-pinot.svc.cluster.local_8098": "ONLINE"
    },
    "workflowEvents__1__13__20211005T2024Z": {
      "Server_cdp-dl-pinot-k8s-server-0.cdp-dl-pinot-k8s-server-headless.dp-metrics-pinot.svc.cluster.local_8098": "ONLINE"
    },
    "workflowEvents__1__14__20211006T0225Z": {
      "Server_cdp-dl-pinot-k8s-server-0.cdp-dl-pinot-k8s-server-headless.dp-metrics-pinot.svc.cluster.local_8098": "ONLINE"
    },
    "workflowEvents__1__15__20211006T1425Z": {
      "Server_cdp-dl-pinot-k8s-server-0.cdp-dl-pinot-k8s-server-headless.dp-metrics-pinot.svc.cluster.local_8098": "ONLINE"
    },
    "workflowEvents__1__16__20211006T2025Z": {
      "Server_cdp-dl-pinot-k8s-server-0.cdp-dl-pinot-k8s-server-headless.dp-metrics-pinot.svc.cluster.local_8098": "ONLINE"
    },
    "workflowEvents__1__17__20211007T0225Z": {
      "Server_cdp-dl-pinot-k8s-server-0.cdp-dl-pinot-k8s-server-headless.dp-metrics-pinot.svc.cluster.local_8098": "ONLINE"
    },
    "workflowEvents__1__18__20211007T1626Z": {
      "Server_cdp-dl-pinot-k8s-server-0.cdp-dl-pinot-k8s-server-headless.dp-metrics-pinot.svc.cluster.local_8098": "CONSUMING"
    },
    "workflowEvents__1__1__20210930T1921Z": {
      "Server_cdp-dl-pinot-k8s-server-0.cdp-dl-pinot-k8s-server-headless.dp-metrics-pinot.svc.cluster.local_8098": "ONLINE"
    },
    "workflowEvents__1__2__20211001T0121Z": {
      "Server_cdp-dl-pinot-k8s-server-0.cdp-dl-pinot-k8s-server-headless.dp-metrics-pinot.svc.cluster.local_8098": "ONLINE"
    },
    "workflowEvents__1__3__20211001T0722Z": {
      "Server_cdp-dl-pinot-k8s-server-0.cdp-dl-pinot-k8s-server-headless.dp-metrics-pinot.svc.cluster.local_8098": "ONLINE"
    },
    "workflowEvents__1__4__20211001T1322Z": {
      "Server_cdp-dl-pinot-k8s-server-0.cdp-dl-pinot-k8s-server-headless.dp-metrics-pinot.svc.cluster.local_8098": "ONLINE"
    },
    "workflowEvents__1__5__20211001T1922Z": {
      "Server_cdp-dl-pinot-k8s-server-0.cdp-dl-pinot-k8s-server-headless.dp-metrics-pinot.svc.cluster.local_8098": "ONLINE"
    },
    "workflowEvents__1__6__20211002T0122Z": {
      "Server_cdp-dl-pinot-k8s-server-0.cdp-dl-pinot-k8s-server-headless.dp-metrics-pinot.svc.cluster.local_8098": "ONLINE"
    },
    "workflowEvents__1__7__20211002T1122Z": {
      "Server_cdp-dl-pinot-k8s-server-0.cdp-dl-pinot-k8s-server-headless.dp-metrics-pinot.svc.cluster.local_8098": "ONLINE"
    },
    "workflowEvents__1__8__20211002T1722Z": {
      "Server_cdp-dl-pinot-k8s-server-0.cdp-dl-pinot-k8s-server-headless.dp-metrics-pinot.svc.cluster.local_8098": "ONLINE"
    },
    "workflowEvents__1__9__20211004T1323Z": {
      "Server_cdp-dl-pinot-k8s-server-0.cdp-dl-pinot-k8s-server-headless.dp-metrics-pinot.svc.cluster.local_8098": "ONLINE"
    }
  },
  "listFields": {}
}
n
unrelated but i see you have upsert config. Upsert doesnt work for hybrid tables
a
hmm it is working actually
i mean we do get the correct counts
at least in the Realtime table
n
only thing i can think of for decreased count is - did you restart the pinot-servers?
a
yes i was going to ask i just had to update kubernetes by add jmx prometheus metrics
will that create an issue
n
then the consuming segment gets dropped and starts reconsuming
a
ohh is there a way to do this uptime ?
n
thats in-memory data, so no. it will not persist across restarts. But the reconsumption should generally be very quick. One issue could be, if your shard retention is set to lower than 6h. that could result in the un persisted data to never get consumed again, because the shard doesnt have it anymore
a
my retention is 6h
n
of the kinesis topic?
a
ohh that u mean kinesis will confirm
1 day
is the retention period
n
what’s the query you’re using to check the count?
a
SELECT count(*) FROM workflowEvents
total count
i think it could be consuming segment since realtime.segment.flush.threshold.time is 6h
so that means that segment is stored in memory ?
n
yes, consuming segment will be inmemory for max 6 hours.
you may have lost 6hours data in the restart, but after the restart, it shouldve ingested all that instantly
a
ohh u mean it should get all the data again from kinesis
n
yup. unless Kinesis doesnt have that data anymore. Which is why i was asking you about kinesis side retention
a
it is 1 day
n
i’m not too sure about the behavior of kinesis when ingestiong events from the past. I don’t know if it throttles and emits them slowly. Atleast in Kafka, I’ve seen the reconsumption happen instantly
a
So the issue is that the “shardIteratorType”: “LATEST”
is there a way in which I can flush the segment from the api , stop sending events temporarily do the prod push
n
aactually you might be right about the “LATEST”
can you change it to
Copy code
AT_SEQUENCE_NUMBER
and then restart again?
a
do i need to restart or can I just update the table config
n
you’ll have to restart, so that consuming segment drops, and then starts again using AT SEQUENCE
a
ok i will try since we are using upsert this might work ..btw regarding the upsert do u think i will have issues when offline segments are created ?
n
@User ^^ what happens to an upsert enabled realtime table, when segments start coming into the offline table? will it fail? or will it be incorrect?
just checked with Jackie. upserts will not work on the offline table. So the results will be incorrect on the offline side
if upserts is important to you, you should disable realtimeToOffline, and just use realtime table
a
hmm ok and should i then have a larger flush time then to avoid creating small segments
wanted a clarification what if no record from the offline segment is updated then will it work..i mean (if I know a record will not be updated after 10 days ) i could have a larger bucket window
n
upserts is solved at the query level, not at the ingestion level. So if you send a duplicate record, it will still exist in Pinot table. Only the query will smartly pick the latest. So when data is moved to offline, it will contain the duplicates
a
ohh i see thanks
so I just need to create a table without the task and drop the offline table ..that should work right ?
n
yes
a
thanks..i will try both kinesis with sequence number that might create duplicates but hopefully the upsert logic will handle it
I did delete the OFFline table and removed the realtime to offline task in QA but I still see the Task listed in Zookeeper console .Also the controller keeps printing
Copy code
No job to purge for the queue TaskQueue_RealtimeToOfflineSegmentsTask
I have not restarted the server yet
I just restarted the entire cluster I still see the above message.I have started Table again with shard iterator at AT_SEQUENCE_NUMBER.I see the iterator is stuck at 66K seconds ago (since we have 1 day retention).I have noticed iin the past if this iterator does not shift for a long time.Will update later on if I dont see it change
It actually does work as the moment it consumes the latest message the iterator age drops to 0.I am not aware how the iterage age is supposed to reflect when the shard iterator is AT_SEQUENCE_NUMBER.
m
@User just to confirm, the consumption is happening in Pinot as expected?
n
as far as i understood, yes. We’ve been chatting separately on a DM too, with Kartik . Hence no replies here
@User ^
m
Thanks @User for confirming. Is there anything to document here for future Kinesis-Pinot users?
a
Yes it is consuming..I do need to make some changes i.e switch to only using Realtime table for upsert. I have modified the flush time.I wanted to know if this requires a restart
n
should not require restart. It will apply to the next consuming segment
a
thanks
n
FYI here’s the Kinesis issue: https://github.com/apache/pinot/issues/7555 @User
a
@User Regarding the data loss in Realtime Table due to Server restarts is it possible to avoid it if pinot.server.instance.realtime.alloc.offheap.direct => true. This will store even the consuming segments in main memory so the data will be retained across restarts pinot.server.instance.realtime.alloc.offheap is by default true which I believe stores the completed segments in main memory
n
That setting only controls if the consuming segments use direct or mmap for the segment's internal structures. No matter what, consuming segments will not be retained across restarts
a
Thanks @User I am asking this because we have very low volume of data ..we were generating very small segments (4KB ) .I modified the flush time from 6h to 7 days (Kinesis max retention period) and I do see 14KB size segments now being created but they are still small.I wanted to increase flush time to 30 days but I was worried of losing data incase there are restarts.Right now we are not seeing any performance drop as the usage is low. I do know that Segment merge and roll-up is being rolled out for Offline tables.Once it is available for Realtime we can use that.In the meantime do u have any suggestions ?

https://www.youtube.com/watch?v=uYtLCQ07OcI

n
you could use that once available in realtime. What’s your overall pinot table retention? It’s okay that your daily segments are this small. We usually insist on making bigger segments, if the small size is happening for smaller windows e.g. total for the day could have been 240M but the flush threshold is set such that only 12M segments are being created. In your case, the data volume is just small.
a
Right now I have set our retention window to 365d…we took this call since we realized upsert is not supported for offline tables so we have to use realtime only for our usecase
n
so max you’ll have 365 segments if you do per day. In such case it’s okay if it’s small
a
i see we did start with 6h and we have 2 shards which is why we have large number of segments
but this is good knowledge maybe I can roll back to 1d or even 2d and then wait for the merge and rollup task to be available