https://pinot.apache.org/ logo
#getting-started
Title
# getting-started
m

Mohit Singh

05/23/2021, 2:42 PM
Hello Everyone.. i am trying to inject data from kafka topic to apache pinot but i didn't see any data loaded do i am missing anything in config related to avro ? Schema
Copy code
{
  "schemaName": "test_schema",
  "dimensionFieldSpecs": [
    {
      "name": "client_id",
      "dataType": "STRING"
    },
    {
      "name": "master_property_id",
      "dataType": "INT"
    },
    {
      "name": "business_unit",
      "dataType": "STRING"
    },
    {
      "name": "error_info_str",
      "dataType": "STRING"
    }
  ],
  "dateTimeFieldSpecs": [
    {
      "name": "timestamp",
      "dataType": "LONG",
      "format": "1:MILLISECONDS:EPOCH",
      "granularity": "1:MILLISECONDS"
    }
  ]
}
Table:
Copy code
{
  "REALTIME": {
    "tableName": "test_schema_REALTIME",
    "tableType": "REALTIME",
    "segmentsConfig": {
      "schemaName": "test_schema",
      "replication": "1",
      "replicasPerPartition": "1",
      "timeColumnName": "timestamp"
    },
    "tenants": {
      "broker": "DefaultTenant",
      "server": "DefaultTenant",
      "tagOverrideConfig": {}
    },
    "tableIndexConfig": {
      "bloomFilterColumns": [],
      "noDictionaryColumns": [],
      "onHeapDictionaryColumns": [],
      "varLengthDictionaryColumns": [],
      "enableDefaultStarTree": false,
      "enableDynamicStarTreeCreation": false,
      "aggregateMetrics": false,
      "nullHandlingEnabled": false,
      "invertedIndexColumns": [],
      "rangeIndexColumns": [],
      "autoGeneratedInvertedIndex": false,
      "createInvertedIndexDuringSegmentGeneration": false,
      "sortedColumn": [],
      "loadMode": "MMAP",
      "streamConfigs": {
        "streamType": "kafka",
        "stream.kafka.topic.name": "TestTopic",
        "stream.kafka.broker.list": "localhost:9092",
        "stream.kafka.consumer.type": "lowlevel",
        "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
        "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
        "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
        "schema.registry.url": "<http://localhost:8081>",
        "realtime.segment.flush.threshold.rows": "0",
        "realtime.segment.flush.threshold.time": "24h",
        "realtime.segment.flush.segment.size": "100M"
      }
    },
    "metadata": {},
    "quota": {},
    "routing": {},
    "query": {},
    "ingestionConfig": {
      "transformConfigs": [
        {
          "columnName": "error_info_str",
          "transformFunction": "json_format(error_info)"
        }
      ]
    },
    "isDimTable": false
  }
}
Kafka Avro Schema:
Copy code
{
  "type": "record",
  "name": "TestRecord",
  "namespace": "com.test.ns",
  "fields": [
    {
      "name": "client_id",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "name": "master_property_id",
      "type": "int"
    },
    {
      "name": "business_unit",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "name": "error_info",
      "type": {
        "type": "record",
        "name": "ErrorInfo",
        "fields": [
          {
            "name": "code",
            "type": [
              "null",
              "string"
            ]
          },
          {
            "name": "description",
            "type": [
              "null",
              "string"
            ]
          }
        ]
      }
    },
    {
      "name": "timestamp",
      "type": [
        "null",
        "long"
      ],
      "default": null
    }
  ]
}
1
Copy code
Caught exception while decoding row, discarding row. Payload is9899�$UNIT-1621781307861E-12345,Some error description�����^
java.io.CharConversionException: Invalid UTF-32 character 0x2010839 (above 0x0010ffff) at char #1, byte #7)
	at shaded.com.fasterxml.jackson.core.io.UTF32Reader.reportInvalid(UTF32Reader.java:195) ~[pinot-all-0.7.1-jar-with-dependencies.jar:0.7.1-e22be7c3a39e840321d3658e7505f21768b228d6]
	at shaded.com.fasterxml.jackson.core.io.UTF32Reader.read(UTF32Reader.java:158) ~[pinot-all-0.7.1-jar-with-dependencies.jar:0.7.1-e22be7c3a39e840321d3658e7505f21768b228d6]
	at shaded.com.fasterxml.jackson.core.json.ReaderBasedJsonParser._loadMore(ReaderBasedJsonParser.java:258) ~[pinot-all-0.7.1-jar-with-dependencies.jar:0.7.1-e22be7c3a39e840321d3658e7505f21768b228d6]
	at shaded.com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:2353) ~[pinot-all-0.7.1-jar-with-dependencies.jar:0.7.1-e22be7c3a39e840321d3658e7505f21768b228d6]
i saw this error in server looks like some problem to decode message from kafka topic.
k

Kishore G

05/23/2021, 3:31 PM
is there anything else in the log?
m

Mohit Singh

05/23/2021, 4:47 PM
its working now i missed one configuration
"stream.kafka.decoder.prop.schema.registry.rest.url":
k

Kishore G

05/23/2021, 4:47 PM
Cool
Can you make a fix to decoder to warn about missing properties?
👍 1
m

Mohit Singh

05/23/2021, 5:30 PM
@User just a quick point correct me if i am wrong.. we have to add one timestamp field in schema that when creating realtime table that timestamp will be use by retention of data etc..? in my case lets say i don't have any timestamp in kafka avro schema but i added a field as
timestamp
in apache pinot schema will it pick a default value automatically?
k

Kishore G

05/23/2021, 5:32 PM
No.. you can probably use a udf to set it to now()
m

Mohit Singh

05/23/2021, 5:38 PM
i was testing the same scenario and realised it take some negative value not sure what it indicate in time of timestamp
-9223372036854776000
now() do i need to use this in transform function?
k

Kishore G

05/23/2021, 5:41 PM
Yes
m

Mohit Singh

05/23/2021, 6:26 PM
its throwing error if i add now() in transform function for timestamp. Invalid table config: test_schema_now_REALTIME. Invalid transform function
'now()'
for column '`timestamp`'