Hi, I'm new to Pinot and experimenting with it. I ...
# getting-started
s
Hi, I'm new to Pinot and experimenting with it. I have Pinot container running as well as Kafka and Zookeeper. I have created a schema and a table to ingest from my kafka topic realtime, but no data is making it through from kafka to Pinot. I've tested the Pinot container and it can see the kafka container (no connectivity issues). Not sure how to troubleshoot. There are no logs in the PinotBroker.log, PinoServer.log, etc. Is there a step I'm missing or is there something wrong with my config? I have copied these two files to /opt/pinot and the table is created successfully but it just doesn't ingest anything
This is the error I'm seeing in the container logs:
Copy code
2022/01/18 18:22:29.532 ERROR [LLRealtimeSegmentDataManager_poc__0__0__20220118T1820Z] [poc__0__0__20220118T1820Z] Caught exception while indexing the record: {
  "fieldToValueMap" : {
    "payload" : {
      "op" : "u",
      "before" : {
        "open_date" : 19010,
        "description" : "test sahar pinot 2",
        "created_at" : 1642530148000,
        "billable" : 1,
        "client_id" : 347359,
        "number" : 41,
        "account_id" : 347321,
        "updated_at" : 1642530148000,
        "user_id" : 347321,
        "group_id" : 347321,
        "display_number" : "00041-Keebler",
        "id" : 347401,
        "status" : 1
      },
      "after" : {
        "open_date" : 19010,
        "description" : "test sahar pinot 2",
        "created_at" : 1642530148000,
        "billable" : 1,
        "client_id" : 347359,
        "number" : 41,
        "account_id" : 347321,
        "updated_at" : 1642530148000,
        "user_id" : 347321,
        "group_id" : 347323,
        "display_number" : "00041-Keebler",
        "id" : 347401,
        "status" : 1
      },
      "source" : {
        "thread" : 30,
        "server_id" : 1,
        "version" : "1.0.0.Final",
        "file" : "docker-1-bin-log.000030",
        "connector" : "mysql",
        "pos" : 2155,
        "name" : "debezium_dev",
        "gtid" : "614c4ede-5f4e-11ec-a055-0242c0a89009:23744",
        "row" : 0,
        "ts_ms" : 1642530148000,
        "snapshot" : "false",
        "db" : "themis_development_1",
        "table" : "matters"
      },
      "ts_ms" : 1642530149107
    },
    "full_payload" : "{\"op\":\"u\",\"before\":{\"open_date\":19010,\"description\":\"test sahar pinot 2\",\"created_at\":1642530148000,\"billable\":1,\"client_id\":347359,\"number\":41,\"account_id\":347321,\"updated_at\":1642530148000,\"user_id\":347321,\"group_id\":347321,\"display_number\":\"00041-Keebler\",\"id\":347401,\"status\":1},\"after\":{\"open_date\":19010,\"description\":\"test sahar pinot 2\",\"created_at\":1642530148000,\"billable\":1,\"client_id\":347359,\"number\":41,\"account_id\":347321,\"updated_at\":1642530148000,\"user_id\":347321,\"group_id\":347323,\"display_numbe",
    "ts_ms" : null
  },
  "nullValueFields" : [ ]
}
java.lang.NullPointerException: null
at org.apache.pinot.segment.local.realtime.impl.dictionary.LongOnHeapMutableDictionary.index(LongOnHeapMutableDictionary.java:37) ~[pinot-all-0.10.0-SNAPSHOT-jar-with-dependencies.jar:0.10.0-SNAPSHOT-7ec47c420be9c6aee6c8e95644266fe9b7fe7a2b]
at org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl.updateDictionary(MutableSegmentImpl.java:532) ~[pinot-all-0.10.0-SNAPSHOT-jar-with-dependencies.jar:0.10.0-SNAPSHOT-7ec47c420be9c6aee6c8e95644266fe9b7fe7a2b]
at org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl.index(MutableSegmentImpl.java:485) ~[pinot-all-0.10.0-SNAPSHOT-jar-with-dependencies.jar:0.10.0-SNAPSHOT-7ec47c420be9c6aee6c8e95644266fe9b7fe7a2b]
at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.processStreamEvents(LLRealtimeSegmentDataManager.java:545) [pinot-all-0.10.0-SNAPSHOT-jar-with-dependencies.jar:0.10.0-SNAPSHOT-7ec47c420be9c6aee6c8e95644266fe9b7fe7a2b]
at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.consumeLoop(LLRealtimeSegmentDataManager.java:419) [pinot-all-0.10.0-SNAPSHOT-jar-with-dependencies.jar:0.10.0-SNAPSHOT-7ec47c420be9c6aee6c8e95644266fe9b7fe7a2b]
at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager$PartitionConsumer.run(LLRealtimeSegmentDataManager.java:593) [pinot-all-0.10.0-SNAPSHOT-jar-with-dependencies.jar:0.10.0-SNAPSHOT-7ec47c420be9c6aee6c8e95644266fe9b7fe7a2b]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_312]
somehow it thinks ts_ms is null when it isn't null in the payload
k
Hi @User. It looks like you're using Debezium CDC to capture database updates and ingest into Pinot using Kafka. The good news is, based on that exception, Pinot is successfully connecting to Kafka. Let me review the real-time table config and find out what's going on with the indexing.
Okay, so the issue is that you have listed
ts_ms
as a field in your schema, which is nested as a field in your JSON payload coming from Debezium. There's a few ways to make this work, but I recommend changing your Debezium configuration to use the outbox pattern and extracting your field values from your payload string so that Pinot doesn't have to parse through the nested JSON.
If this isn't an option, you can create a generated field for your
ts_ms
that extracts the value from the nested JSON. Looking at your payload in the exception, it looks like that value is null anyways.
s
Thanks @User. I edited my table config to include
Copy code
"ingestionConfig": {
      "transformConfigs": [
        {
          "columnName": "full_payload",
          "transformFunction": "jsonFormat(payload)"
        },
        {
          "columnName": "ts_ms",
          "transformFunction": "jsonPathArray(payload, '$.[*].ts_ms')"
        }
      ]
    },
so that it extracts the ts_ms out of the payload. Does this look right to you? I'm still getting the same exception. I only need to edit the config and it should pick it up, right? I don't need to restart anything after a table config change?
k
If you set a default value for the schema column definition then you should be all set.
Also your transform needs to use the right data type for extracting the value.
s
thank you for your help, data is making it in Pino now
k
Also you will need to target the column “full_payload” in the transform
Cool glad to help. Let me know if you run into anything else.
👍 1