Hello everyone, I’m playing with ingestion of even...
# general
a
Hello everyone, I’m playing with ingestion of events from kafka to pinit. Initially was blocked on getting any events into pinot. Though thats working now, we’re noticing that ingestion arbitrarily stops after some time. Our cluster health look good, and the pinot UI also shows everything green. (we inferred that ingestion has stopped as the query console gets stuck at the same number of docs, and kafka also shows that consumption has stopped) Is there some limit to the number of events that we can push to Pinot, or can it be some configuration issue? I’m not sure how to go about debugging this.
k
anything in the logs?
its mostly because of time column not configured correctly in the table config/schema
a
No new log message post the pause the happpend
we did configure the time column. I’m sharing my config and schema. config -
Copy code
{
  "tableName": "user_job_impression_v2",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "schemaName": "user_job_impression_v2",
    "replication": "1",
    "timeColumnName": "updated_at",
    "allowNullTimeValue": false,
    "replicasPerPartition": "1"
  },
  "tenants": {
    "broker": "DefaultTenant",
    "server": "DefaultTenant",
    "tagOverrideConfig": {}
  },
  "tableIndexConfig": {
    "loadMode": "MMAP",
    "streamConfigs": {
      "streamType": "kafka",
      "stream.kafka.topic.name": "bq-kafka-poc-v2",
      "stream.kafka.broker.list": "xxx",
      "stream.kafka.consumer.type": "lowlevel",
      "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
      "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
      "realtime.segment.flush.threshold.rows": "50000",
      "realtime.segment.flush.threshold.time": "3600000",
      "stream.kafka.bootstrap.servers": "xxx",
      "security.protocol": "SASL_SSL",
      "sasl.mechanism": "PLAIN",
      "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required xxx;",
      "stream.kafka.hlc.bootstrap.server": "xxx"
    },
    "onHeapDictionaryColumns": [],
    "varLengthDictionaryColumns": [],
    "enableDefaultStarTree": false,
    "enableDynamicStarTreeCreation": false,
    "aggregateMetrics": false,
    "nullHandlingEnabled": false
  },
  "metadata": {},
  "quota": {},
  "routing": {},
  "query": {},
  "ingestionConfig": {},
  "isDimTable": false
}
schema -
Copy code
{
  "schemaName": "user_job_impression_v2",
  "dimensionFieldSpecs": [
    {
      "name": "user_id",
      "dataType": "INT"
    },
    {
      "name": "job_id",
      "dataType": "INT"
    },
    {
      "name": "job_title",
      "dataType": "STRING"
    },
    {
      "name": "event_type",
      "dataType": "STRING"
    },
    {
      "name": "source",
      "dataType": "STRING"
    },
    {
      "name": "is_deposit",
      "dataType": "BOOLEAN"
    }
  ],
  "metricFieldSpecs": [
    {
      "name": "horizontal_position",
      "dataType": "FLOAT"
    }
  ],
  "dateTimeFieldSpecs": [
    {
      "name": "created_at",
      "dataType": "LONG",
      "format": "1:MILLISECONDS:EPOCH",
      "granularity": "1:MILLISECONDS"
    },
    {
      "name": "updated_at",
      "dataType": "LONG",
      "format": "1:MILLISECONDS:EPOCH",
      "granularity": "1:MILLISECONDS"
    }
  ]
}