Hi Team, I am trying to ingest kafka messages in p...
# getting-started
s
Hi Team, I am trying to ingest kafka messages in pinot table in realtime.. We are using avro schema and confluent schema registry. I am not seeing any data inserted in table..can you please check?
my table json format
Copy code
{
  "REALTIME": {
    "tableName": "stocks_order_REALTIME",
    "tableType": "REALTIME",
    "segmentsConfig": {
      "timeColumnName": "ts_ms",
      "schemaName": "stocks_order",
      "replication": "1",
      "replicasPerPartition": "1"
    },
    "tenants": {
      "broker": "DefaultTenant",
      "server": "DefaultTenant",
      "tagOverrideConfig": {}
    },
    "tableIndexConfig": {
      "bloomFilterColumns": [],
      "invertedIndexColumns": [],
      "rangeIndexColumns": [],
      "autoGeneratedInvertedIndex": false,
      "createInvertedIndexDuringSegmentGeneration": false,
      "sortedColumn": [],
      "loadMode": "MMAP",
      "streamConfigs": {
        "streamType": "kafka",
        "stream.kafka.topic.name": "gw-stocks-replica.groww_stocks.groww_stocks.stocks_order",
        "stream.kafka.broker.list": "10.100.255.74:9094",
        "stream.kafka.consumer.type": "lowlevel",
        "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
        "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
        "realtime.segment.flush.threshold.rows": "0",
        "realtime.segment.flush.threshold.time": "24h",
        "realtime.segment.flush.segment.size": "100M",
        "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
        "stream.kafka.decoder.prop.schema.registry.rest.url": "<https://dp-schema-registry.growwinfra.in>"
      },
      "noDictionaryColumns": [],
      "onHeapDictionaryColumns": [],
      "varLengthDictionaryColumns": [],
      "enableDefaultStarTree": false,
      "enableDynamicStarTreeCreation": false,
      "aggregateMetrics": false,
      "nullHandlingEnabled": false
    },
    "metadata": {},
    "quota": {},
    "routing": {},
    "query": {},
    "ingestionConfig": {},
    "isDimTable": false
  }
}
m
do you see any errors in the server logs?
I am also trying to do this so I’m curious if we are seeing the same problems.
s
Hi Mark, I am getting this error java.io.CharConversionException: Invalid UTF-32 character
can we huddle once?