hi all, running into some issues getting the `Real...
# troubleshooting
t
hi all, running into some issues getting the
RealtimeToOfflineSegmentsTask
running… I’ve been following the guide, added the task config, but the task stays at the status of
NOT_STARTED
with a
{}
task config in the task view giving a
404 error
when trying to run. Any idea what is not correctly configured?
m
@Xiaobing ^^
n
Can you check minion logs for any errors?
t
what should I search for, I am only seeing the following lines:
Copy code
Initialized TaskExecutorFactoryRegistry with 5 task executor factories: [MergeRollupTask, RealtimeToOfflineSegmentsTask, ConvertToRawIndexTask, PurgeTask, SegmentGenerationAndPushTask] in 1132ms

Registering RealtimeToOfflineSegmentsTask with task executor factory: RealtimeToOfflineSegmentsTaskExecutorFactory, event observer factory: DefaultMinionEventObserverFactory
Screen Shot 2022-09-28 at 11.41.55 AM.png
@Neha Pawar I don’t see any logs from the minion indicating that the job was submitted to it
I get the following response when manually trying to execute it
get task config returns a 500 error
seems to be this Task/getTaskMetadataByTable API that is giving the 404 error when the
task
object is defined in the table
x
for that 404 error, I think it’s because no tasks ever run, so no metadata got created yet. do you have access to controller logs? you could search the table name where the task is defined and see if there was any exception (particularly from PinotTaskManager class)
you could also try this swagger API to see if any errors when scheduling the task (it’s added recently though)
t
Okay, so I do see the job getting executed there, its saying
Copy code
Trying to schedule task type: RealtimeToOfflineSegmentsTask, isLeader: false
Start generating task configs for table: uplinkpayloadevent_REALTIME for task: RealtimeToOfflineSegmentsTask
No realtime-completed segments found for table: uplinkpayloadevent_REALTIME, skipping task generation: RealtimeToOfflineSegmentsTask
but my table has also reached its
realtime.segment.flush.segment.size
so I am surprised it is not considered “completed”
This is the generator/ table task debug
x
got it.. both the API resp and the job logs meant the tasks got generated, just that it didn’t run much as it found no completed RT segments 🤔
t
that’s what it sounds like… not sure how the segment can be marked as “completed”… currently it is set to “consuming” but as reached its “flush segments” limit, so isn’t growing in size anymore event though more events are flowing to it from kafka
x
just to confirm: is there any completed segments in the table so far?
t
Correct, it seems like there is only one segment (status: good), with one replica set (status: consuming)
metadata for the replicat set is:
Copy code
{
  "segment.realtime.numReplicas": "1",
  "segment.creation.time": "1664297019057",
  "segment.flush.threshold.size": "100000",
  "segment.realtime.startOffset": "0",
  "segment.realtime.status": "IN_PROGRESS"
}
where that segment currently contains (over) 100000 records
x
try this forceCommit, and see if anything happens? (In theory, it should auto commit and move on) could you share the streaming config part in the table config? just wonder if any other knobs take control on the flush.
t
I will give it a shot… here is the realtime table config:
Copy code
{
  "REALTIME": {
    "tableName": "uplinkpayloadevent_REALTIME",
    "tableType": "REALTIME",
    "segmentsConfig": {
      "schemaName": "uplinkpayloadevent",
      "replication": "1",
      "replicasPerPartition": "1",
      "timeColumnName": "time_string",
      "minimizeDataMovement": false
    },
    "tenants": {
      "broker": "DefaultTenant",
      "server": "DefaultTenant",
      "tagOverrideConfig": {}
    },
    "tableIndexConfig": {
      "invertedIndexColumns": [],
      "noDictionaryColumns": [],
      "streamConfigs": {
        "streamType": "kafka",
        "stream.kafka.topic.name": "<kafka topic>",
        "stream.kafka.broker.list": "<kafka servers>",
        "stream.kafka.consumer.type": "lowlevel",
        "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
        "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
        "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
        "realtime.segment.flush.threshold.rows": "0",
        "realtime.segment.flush.threshold.time": "1m",
        "realtime.segment.flush.segment.size": "100K"
      },
      "rangeIndexColumns": [
        "key_range"
      ],
      "rangeIndexVersion": 2,
      "autoGeneratedInvertedIndex": false,
      "createInvertedIndexDuringSegmentGeneration": false,
      "sortedColumn": [
        "app_tok",
        "moduleaddress"
      ],
      "bloomFilterColumns": [
        "key_hash"
      ],
      "loadMode": "MMAP",
      "onHeapDictionaryColumns": [],
      "varLengthDictionaryColumns": [],
      "enableDefaultStarTree": false,
      "enableDynamicStarTreeCreation": false,
      "aggregateMetrics": false,
      "nullHandlingEnabled": false,
      "optimizeDictionaryForMetrics": false,
      "noDictionarySizeRatioThreshold": 0
    },
    "metadata": {},
    "quota": {},
    "task": {
      "taskTypeConfigsMap": {
        "RealtimeToOfflineSegmentsTask": {
          "bufferTimePeriod": "2h",
          "bucketTimePeriod": "24h",
          "roundBucketTimePeriod": "1m",
          "mergeType": "dedup",
          "maxNumRecordsPerSegment": "1000000",
          "schedule": "0 * * * * ?"
        }
      }
    },
    "routing": {},
    "query": {},
    "fieldConfigList": [],
    "ingestionConfig": {},
    "isDimTable": false
  }
}
doesn’t seem like the force commit worked unfortunately, I tried for both the table name and the REALTIME part of it
Copy code
Handled request from <ip> POST http://<url>/tables/uplinkpayloadevent/forceCommit, content-type null status code 200 OK
85 START: CallbackHandler 23, INVOKE /pinot-dev/INSTANCES/Server_pinot-server-0.pinot-server-headless.datalake.svc.cluster.local_8098/MESSAGES listener: org.apache.helix.controller.GenericHelixController@6e829e50 type: CALLBACK
CallbackHandler 23 subscribing changes listener to path: /pinot-dev/INSTANCES/Server_pinot-server-0.pinot-server-headless.datalake.svc.cluster.local_8098/MESSAGES, callback type: CALLBACK, event types: [NodeChildrenChanged], listener: org.apache.helix.controller.GenericHelixController@6e829e50, watchChild: false
CallbackHandler23, Subscribing to path: /pinot-dev/INSTANCES/Server_pinot-server-0.pinot-server-headless.datalake.svc.cluster.local_8098/MESSAGES took: 0
n
have you read the section on How it works? it will process 1 window at a time (in your case minTime to minTime+24hr). If any of that data trickles into the CONSUMING segment, we cannot move it to offline, as that data is not persisted yet: https://docs.pinot.apache.org/operators/operating-pinot/pinot-managed-offline-flows#how-this-works
x
hi Neha, I think what’s interesting here is the CONSUMING segment is not commit as expected. and the table doesn’t have any completed segments yet.
how about we just keep this knob and try 1000 (a small value to kick off some segments)
Copy code
"realtime.segment.flush.threshold.rows": "1000",
t
I tried updating this value but the segment metadata itself doesn’t seem to be changing. Does it make sense to just delete the segment?
I have also tried reloading the segment but still doesn’t update the segment metadata
I can see the ingestion state is unhealthy
Is another segment supposed to be created when the first one gets filled up? It seems like the first one is full, won’t flush to offline, generate a new segment, or ingest any new data
x
sry for the delay, was distracted. 1. yeah, it’d work to delete that old consuming segment to let a new one created with new configs. (but recently new pause/resume API were added to solve this more easily) 2. yes, once a consuming segment is filled up and committed, a new consuming segment is created to continue ingestion 3. unhealthy ingestion state: maybe check out the server side logs to help identify the causes
t
I have deleted the segment, however, it never regenerated. I had to fully delete the whole realtime table to get a new segment to be generated. I tried the resume API as well as the reload, rebalance, restart APIs - none of which were able to generate a new segment, it just continued to sit with 0 segments. With the newly generated segments, I set the flush rate to 1000. Just like before, when the total number of events ingested gets to the
realtime.segment.flush.threshold.rows
it essentially freezes ingestion and doesn’t mark the segment as complete, or generate a new segment to continue the real time data
unable to ingest any more messages after the realtime.segment.flush.threshold.rows is met, yet the segment is still in CONSUMING state and won’t be committed (automatically or via the force API)
x
hmm.. any error logs on the server side? I’d wonder if there is any issues when it tries to commit the segment.
t
I do see this exception
Copy code
Using fixed length dictionary for column: app_tok, size: 220
Created dictionary for STRING column: app_tok with cardinality: 11, max length in bytes: 20, range: 082a91eec728d5ababc3 to null
Using fixed length dictionary for column: gatewayaddress, size: 208
Created dictionary for STRING column: gatewayaddress with cardinality: 8, max length in bytes: 26, range: $101$0-0-0-db94abef0 to $101$0-0-0000b82-7ebefd489
Using fixed length dictionary for column: message_str, size: 2639000
Created dictionary for STRING column: message_str with cardinality: 1000, max length in bytes: 2639, range: <DATA> to <data>
Using fixed length dictionary for column: key_hash, size: 24000
Created dictionary for STRING column: key_hash with cardinality: 1000, max length in bytes: 24, range: +/2wCQzSfKCnlIZARRZ5Mw== to zxja3VKp9AD/hdDktP+EMw==
Creating bloom filter with cardinality: 1000, fpp: 0.05
Using fixed length dictionary for column: net_tok, size: 16
Created dictionary for STRING column: net_tok with cardinality: 2, max length in bytes: 8, range: 4f50454e to null
Using fixed length dictionary for column: acctid, size: 20
Created dictionary for STRING column: acctid with cardinality: 5, max length in bytes: 4, range: 2 to null
Using fixed length dictionary for column: id, size: 36000
Created dictionary for STRING column: id with cardinality: 1000, max length in bytes: 36, range: 0003b2a5-6c0d-478a-8f34-96a4517e9955 to fffc7d7f-fac1-4e8c-b0c0-85b2e40f3580
Using fixed length dictionary for column: moduleaddress, size: 5070
Created dictionary for STRING column: moduleaddress with cardinality: 195, max length in bytes: 26, range: $101$0-0-0-db94abef0 to $501$0-0-0000ff2-aaae36fb4
Using fixed length dictionary for column: time_string, size: 23000
Created dictionary for STRING column: time_string with cardinality: 1000, max length in bytes: 23, range: 2022-09-26T14:44:38.480 to 2022-09-26T15:04:55.125
Using fixed length dictionary for column: key_range, size: 60000
Created dictionary for STRING column: key_range with cardinality: 1000, max length in bytes: 60, range: 2022-09-26T14:44:38.480_008ca3de-b00e-4478-bf84-ef371a545e73 to 2022-09-26T15:04:55.125_21c02faf-cdf1-41c8-a48d-fe6a879c23e3
Start building IndexCreator!
Finished records indexing in IndexCreator!
Could not build segment
java.lang.NumberFormatException: For input string: "2022-09-26T14:44:38.480"
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) ~[?:?]
	at java.lang.Long.parseLong(Long.java:692) ~[?:?]
	at java.lang.Long.parseLong(Long.java:817) ~[?:?]
	at org.apache.pinot.segment.local.segment.creator.impl.SegmentColumnarIndexCreator.writeMetadata(SegmentColumnarIndexCreator.java:742) ~[pinot-all-0.11.0-jar-with-dependencies.jar:0.11.0-1b4d6b6b0a27422c1552ea1a936ad145056f7033]
	at org.apache.pinot.segment.local.segment.creator.impl.SegmentColumnarIndexCreator.seal(SegmentColumnarIndexCreator.java:694) ~[pinot-all-0.11.0-jar-with-dependencies.jar:0.11.0-1b4d6b6b0a27422c1552ea1a936ad145056f7033]
	at org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl.handlePostCreation(SegmentIndexCreationDriverImpl.java:276) ~[pinot-all-0.11.0-jar-with-dependencies.jar:0.11.0-1b4d6b6b0a27422c1552ea1a936ad145056f7033]
	at org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl.build(SegmentIndexCreationDriverImpl.java:248) ~[pinot-all-0.11.0-jar-with-dependencies.jar:0.11.0-1b4d6b6b0a27422c1552ea1a936ad145056f7033]
	at org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter.build(RealtimeSegmentConverter.java:123) ~[pinot-all-0.11.0-jar-with-dependencies.jar:0.11.0-1b4d6b6b0a27422c1552ea1a936ad145056f7033]
	at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.buildSegmentInternal(LLRealtimeSegmentDataManager.java:873) [pinot-all-0.11.0-jar-with-dependencies.jar:0.11.0-1b4d6b6b0a27422c1552ea1a936ad145056f7033]
	at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.buildSegmentForCommit(LLRealtimeSegmentDataManager.java:800) [pinot-all-0.11.0-jar-with-dependencies.jar:0.11.0-1b4d6b6b0a27422c1552ea1a936ad145056f7033]
	at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager$PartitionConsumer.run(LLRealtimeSegmentDataManager.java:699) [pinot-all-0.11.0-jar-with-dependencies.jar:0.11.0-1b4d6b6b0a27422c1552ea1a936ad145056f7033]
	at java.lang.Thread.run(Thread.java:829) [?:?]
Could not build segment for uplinkpayloadevent__0__0__20220928T1958Z
I can’t tell what is parsing this string as a long from this message
java.lang.NumberFormatException: For input string: "2022-09-26T14:44:38.480"
but my time column is configured to be a string in the datetime format
x
got it. this exception explains why segment fails to commit. what’s your table schema? likely the time column is misconfig’ed: epoch vs. date 🤔
t
Copy code
"dateTimeFieldSpecs": [
        {
          "name": "time_string",
          "dataType": "STRING",
          "format": "1:MILLISECONDS:EPOCH",
          "granularity": "1:MILLISECONDS"
        }
      ]
Should this format be “1MILLISECONDSDATE”? I haven’t seen that in the docs yet
x
this might work for you:
Copy code
"dateTimeFieldSpecs": [
        {
          "name": "time_string",
          "dataType": "STRING",
          "format": "1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd'T'HH:mm:ss.SSS",
          "granularity": "1:MILLISECONDS"
        }
      ]
t
oh wow, I must have missed that! will give it another shot!
👌 1
that definitely fixed my commit issue! I have hundreds of segments now lol and it seems like its saving them in the deep storage as well, which it wasn’t doing before either!
x
cool. then maybe fine tune the flush threshold so that you don’t end up with too many small segments.
t
yes, I will set it back to 100k now that it is able to commit and create new segments. It seems like the job is also executing as expected now! thank you all for your support! 😄
👍 1
x
btw, watch out …flush.segment.size config, I think it should be called …flush.
threshold
.segment.size