Hi Team, i am trying to push data realtime to off...
# general
v
Hi Team, i am trying to push data realtime to offline using minion job "RealtimeToOfflineSegmentsTask". getting the logs as below:
Copy code
Start generating task configs for table: events2_REALTIME for task: RealtimeToOfflineSegmentsTask
No realtime-completed segments found for table: events2_REALTIME, skipping task generation: RealtimeToOfflineSegmentsTask
Finished CronJob: table - events2_REALTIME, task - RealtimeToOfflineSegmentsTask, next runtime is 2022-11-04T07:04:00.000+0000
i've pushed huge number of data and its creating multiple segment but its not converting to realtime to offline table. Thanks
s
Does the REALTIME table events2_REALTIME have any segments which are not IN_PROGRESS? In PROPERTYSTORE?
v
can you please help me how can i know it?
where can i check it?
s
controller UI -> Zookeeper browser -> PROPERTYSTORE -> SEGMENTS -> events2_REALTIME and look if any segments have
Copy code
"segment.realtime.status": "DONE",
Example:
Basically, the R2O task will only push completed segments from realtime table to the offline one. Not consuming ones
v
thank you @saurabh dubey i can see 12 segments are completed.
but can not able to see any data in offline table
s
Is this the only log you are seeing?
Copy code
No realtime-completed segments found for table: events2_REALTIME, skipping task generation: RealtimeToOfflineSegmentsTask
? You should be checking logs on both controller and minions Also what pinot version are you on?
v
yeah i am checking both the logs
there is no logs in minion because controller is not able to create minion job
Copy code
{
  "tableName": "events3",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "timeColumnName": "ts",
    "schemaName": "events3",
    "replication": "1",
    "replicasPerPartition": "1",
    "retentionTimeUnit": "DAYS",
    "retentionTimeValue": "1"
  },
  "task": {
    "taskTypeConfigsMap": {
      "RealtimeToOfflineSegmentsTask": {
        "bufferTimePeriod": "1m",
        "bucketTimePeriod": "5m",
        "roundBucketTimePeriod": "1m",
        "schedule": "0 * * * * ?",
        "mergeType": "rollup",
        "count.aggregationType": "max",
        "maxNumRecordsPerSegment": "1000"
      }
    }
  },
  "tableIndexConfig": {
    "invertedIndexColumns": [],
    "noDictionaryColumns": [],
    "streamConfigs": {
      "streamType": "kafka",
      "stream.kafka.topic.name": "test",
      "stream.kafka.broker.list": "SERVERS",
      "stream.kafka.consumer.type": "lowlevel",
      "stream.kafka.consumer.prop.auto.offset.reset": "largest",
      "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
      "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
      "realtime.segment.flush.threshold.rows": "0",
      "realtime.segment.flush.threshold.time": "1h",
      "realtime.segment.flush.segment.size": "1M"
    },
    "createInvertedIndexDuringSegmentGeneration": false,
    "rangeIndexColumns": [],
    "rangeIndexVersion": 2,
    "autoGeneratedInvertedIndex": false,
    "sortedColumn": [],
    "bloomFilterColumns": [],
    "loadMode": "MMAP",
    "onHeapDictionaryColumns": [],
    "varLengthDictionaryColumns": [],
    "enableDefaultStarTree": false,
    "enableDynamicStarTreeCreation": false,
    "aggregateMetrics": false,
    "nullHandlingEnabled": false
  },
  "tenants": {},
  "metadata": {}
}
this is my realtime table config
s
table is called
events3
? Log seems to be for events2_REALTIME?
v
i just now created events3 table, so you can ignore it, that is not a issue
s
I see. if there are completed segments in ZK, we should see tasks getting generated.. This could also be ZK connectivity issue (controller -> ZK). Do you see any logs that suggest that? BTW you can trigger a task on demand using /tasks/schedule API. Maybe tail logs on the controller and trigger using the API to get the latest logs?
v
can we even create task through API?
i can't see any zookeeper logs both the files are empty
s
Yes you can trigger the task on demand using /tasks/schedule API on controller passing the table name and task type
v
that's i know man, but the problem is first that task should created and controller is not able to do it
s
Correct thats certainly weird. I was suggesting you could try to generate a fresh task using
/tasks/schedule
and verify if the controller still shows the same logs. Just in case the logs you shared were probably from a time when the segments were still in CONSUMING state. But if that's still the case, might need more investigation.
v
okay let me try it, thanks for help man
Copy code
Start generating task configs for table: events3_REALTIME for task: RealtimeToOfflineSegmentsTask
No realtime-completed segments found for table: events3_REALTIME, skipping task generation: RealtimeToOfflineSegmentsTask
even with API seeing same log
do we need to create realtime table before offline table?
or anything will be fine?
s
Don't think that should matter. Ensuring there are completed segments for the realtime table is necessary. Your log seems to suggest there are no completed segments for table events3_REALTIME
v
okay let me share you recent log with the zookeepwr logs
Copy code
Start generating task configs for table: events3_REALTIME for task: RealtimeToOfflineSegmentsTask
Window data overflows into CONSUMING segments for partition of segment: events3__0__2__20221104T0918Z. Skipping task generation: RealtimeToOfflineSegmentsTask
Finished CronJob: table - events3_REALTIME, task - RealtimeToOfflineSegmentsTask, next runtime is 2022-11-04T12:13:00.000+0000
this is the logs
Copy code
{
  "id": "events3__0__2__20221104T0918Z",
  "simpleFields": {
    "segment.crc": "4243541523",
    "segment.creation.time": "1667553508578",
    "segment.download.url": "URL",
    "segment.end.time": "1667374708937",
    "segment.flush.threshold.size": "54547",
    "segment.index.version": "v3",
    "segment.realtime.endOffset": "762871",
    "segment.realtime.numReplicas": "1",
    "segment.realtime.startOffset": "708324",
    "segment.realtime.status": "DONE",
    "segment.start.time": "1667374708937",
    "segment.time.unit": "MILLISECONDS",
    "segment.total.docs": "54547"
  },
  "mapFields": {},
  "listFields": {}
}
completed segment log
s
Ok so this is progress. It did identify the completed segment this time. But it's saying the completed segment isn't yet eligible for offline push because the current window has some overlap with a consuming segment. I'd suggest you wait for some more completed segments to be created before you attempt this again.
Or change your
Copy code
"bufferTimePeriod": "1m",
                "bucketTimePeriod": "5m",
settings. These anyway don't look like production settings to me? @Neha Pawar and @Mark Needham for more?
n
what are the values like in column
ts
? can you do a
select min(ts), max(ts), $segmentName from table group by $segmentName
?the entire window of [min timestamp, min timestamp + 5m] should fall in the completed segments. None of the consuming segments should have any values from that range. Only then will the code think it’s safe to process that window. That’s because consuming segments aren’t persisted, so we cannot start using that data to move to offline
v
Hi @Neha Pawar here ts is just a time column. below is the configuration:
Copy code
"dateTimeFieldSpecs": [{
      "name": "ts",
      "dataType": "TIMESTAMP",
      "format" : "1:MILLISECONDS:EPOCH",
      "granularity": "1:MILLISECONDS"
    }]
Copy code
select min(ts), max(ts), $segmentName from events3 group by $segmentName
i am getting a outputs, buts its not converting
all config:
Copy code
Schemas:

{
    "schemaName": "events3",
    "dimensionFieldSpecs": [
      {
        "name": "uuid",
        "dataType": "STRING"
      }
    ],
    "metricFieldSpecs": [
      {
        "name": "count",
        "dataType": "INT"
      }
    ],
    "dateTimeFieldSpecs": [{
      "name": "ts",
      "dataType": "TIMESTAMP",
      "format" : "1:MILLISECONDS:EPOCH",
      "granularity": "1:MILLISECONDS"
    }]
  }
Copy code
Offline

{
    "tableName": "events3",
    "tableType": "OFFLINE",
    "segmentsConfig": {
      "timeColumnName": "ts",
      "schemaName": "events3",
      "replication": "1",
      "replicasPerPartition": "1"
    },
    "ingestionConfig": {
      "batchIngestionConfig": {
        "segmentIngestionType": "APPEND",
        "segmentIngestionFrequency": "HOURLY"
      }
    },
    "tableIndexConfig": {
      "loadMode": "MMAP"
    },
    "tenants": {},
    "metadata": {}
  }
Copy code
Realtime:

{
    "tableName": "events3",
    "tableType": "REALTIME",
    "segmentsConfig": {
        "timeColumnName": "ts",
        "schemaName": "events3",
        "replication": "1",
        "replicasPerPartition": "1",
        "retentionTimeUnit": "DAYS",
        "retentionTimeValue": "10m"
    },
    "task": {
        "taskTypeConfigsMap": {
            "RealtimeToOfflineSegmentsTask": {
                "bufferTimePeriod": "1m",
                "bucketTimePeriod": "5m",
                "roundBucketTimePeriod": "1m",
                "schedule": "0 * * * * ?",
                "mergeType": "rollup",
                "count.aggregationType": "max",
                "maxNumRecordsPerSegment": "1000"
            }
        }
    },
    "tableIndexConfig": {
        "invertedIndexColumns": [],
        "noDictionaryColumns": [],
        "streamConfigs": {
            "streamType": "kafka",
            "stream.kafka.topic.name": "pinot_test",
            "stream.kafka.broker.list": "SERVER-LIST",
            "stream.kafka.consumer.type": "lowlevel",
            "stream.kafka.consumer.prop.auto.offset.reset": "largest",
            "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
            "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
            "realtime.segment.flush.threshold.rows": "0",
            "realtime.segment.flush.threshold.time": "1h",
            "realtime.segment.flush.segment.size": "1M"
        },
        "createInvertedIndexDuringSegmentGeneration": false,
        "rangeIndexColumns": [],
        "rangeIndexVersion": 2,
        "autoGeneratedInvertedIndex": false,
        "sortedColumn": [],
        "bloomFilterColumns": [],
        "loadMode": "MMAP",
        "onHeapDictionaryColumns": [],
        "varLengthDictionaryColumns": [],
        "enableDefaultStarTree": false,
        "enableDynamicStarTreeCreation": false,
        "aggregateMetrics": false,
        "nullHandlingEnabled": false
    },
    "tenants": {},
    "metadata": {}
}
Hi @Neha Pawar @saurabh dubey i've tried with minikube local env and it worked in first go but when i tried to create 2nd table and tried same its not working now. same happned with our dev env as well