Hello everyone :wave: I'm seeing a problem where P...
# troubleshooting
l
Hello everyone 👋 I'm seeing a problem where Pinot is not able to ingest a JSON object, it just shows up as null in the table... Will post more details in thread
Here is sample data from kafka
Copy code
{
  "objectId": "00000000-0000-0000-0000-000000000000",
  "jsonObject": {
    "values": [
      {
        "id": "bob",
        "names": [
          "a",
          "b",
          "c",
          "d",
          "e"
        ]
      }
    ]
  }
}
And schema..
Copy code
{
  "schemaName": "myObjects",
  "dimensionFieldSpecs": [
    {
      "name": "objectId",
      "dataType": "STRING"
    },
    {
      "name": "jsonObject",
      "dataType": "JSON"
    }
  ],
  "dateTimeFieldSpecs": [
    {
      "name": "lastModified",
      "dataType": "LONG",
      "format": "1:MILLISECONDS:EPOCH",
      "granularity": "1:DAYS"
    }
  ]
}
I've got jsonObject in noDictionaryColumns and in jsonIndexColumns
I'm using Pinot 0.10.0, any ideas what's wrong?
s
Hi @User. The json object does not seem to have "lastModified" field. Have you intentionally truncated the object? Or is it actually not part of the objects you're trying to ingest?
l
I truncated it yes
It's all there
All the other data in my schema is showing up
s
Could you share your table config @User? With
Copy code
{
  "tableName": "myObject",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "timeColumnName": "lastModified",
    "timeType": "MILLISECONDS",
    "schemaName": "myObjects",
    "replicasPerPartition": "1"
  },
  "tenants": {},
  "tableIndexConfig": {
    "loadMode": "MMAP",
    "streamConfigs": {
      "streamType": "kafka",
      "stream.kafka.consumer.type": "lowlevel",
      "stream.kafka.topic.name": "object-topic",
      "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
      "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
      "stream.kafka.broker.list": "localhost:9876",
      "realtime.segment.flush.threshold.time": "5000",
      "realtime.segment.flush.threshold.rows": "1",
      "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
    }
  },
  "metadata": {
    "customConfigs": {}
  }
}
I was able to successfully able to ingest the data
l
Are you using 0.10.0 or master?
Copy code
{
  "REALTIME": {
    "tableName": "myObjects",
    "tableType": "REALTIME",
    "segmentsConfig": {
      "timeType": "MILLISECONDS",
      "schemaName": "myObjects",
      "retentionTimeUnit": "DAYS",
      "retentionTimeValue": "365",
      "timeColumnName": "lastModified",
      "allowNullTimeValue": false,
      "replicasPerPartition": "2"
    },
    "tenants": {
      "broker": "DefaultTenant",
      "server": "DefaultTenant"
    },
    "tableIndexConfig": {
      "rangeIndexVersion": 2,
      "jsonIndexColumns": [
        "jsonObject"
      ],
      "autoGeneratedInvertedIndex": false,
      "createInvertedIndexDuringSegmentGeneration": false,
      "loadMode": "MMAP",
      "noDictionaryColumns": [
        "lastModified",
        "jsonObject"
      ],
      "enableDefaultStarTree": false,
      "enableDynamicStarTreeCreation": false,
      "segmentPartitionConfig": {
        "columnPartitionMap": {
          "objectId": {
            "functionName": "Murmur",
            "numPartitions": 2
          }
        }
      },
      "aggregateMetrics": false,
      "nullHandlingEnabled": false
    },
    "metadata": {
      "customConfigs": {}
    },
    "routing": {
      "segmentPrunerTypes": [
        "partition"
      ],
      "instanceSelectorType": "replicaGroup"
    },
    "instanceAssignmentConfigMap": {
      "CONSUMING": {
        "tagPoolConfig": {
          "tag": "DefaultTenant",
          "poolBased": false,
          "numPools": 0
        },
        "replicaGroupPartitionConfig": {
          "replicaGroupBased": true,
          "numInstances": 0,
          "numReplicaGroups": 2,
          "numInstancesPerReplicaGroup": 8,
          "numPartitions": 0,
          "numInstancesPerPartition": 0
        }
      }
    },
    "upsertConfig": {
      "mode": "NONE",
      "hashFunction": "NONE"
    },
    "ingestionConfig": {
      "streamIngestionConfig": {
        "streamConfigMaps": [
          {
            "streamType": "kafka",
            "stream.kafka.consumer.type": "lowlevel",
            "stream.kafka.topic.name": "my_objects",
            "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
            "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
            "stream.kafka.broker.list": ".....",
            "realtime.segment.flush.threshold.rows": "0",
            "realtime.segment.flush.threshold.time": "24h",
            "realtime.segment.flush.threshold.segment.size": "200M",
            "realtime.segment.flush.autotune.initialRows": "2000000",
            "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
          }
        ]
      },
      "transformConfigs": [],
      "complexTypeConfig": {}
    },
    "isDimTable": false
  }
}
Any idea what's wrong here @User?
s
I was able to ingest the data even with this table config @User. Can you share details about your kafka producer client? Are you able to consume these messages using a regular kafka client from your kafka topic? I'd let others from the community also have a look at this @User @User
l
I've tried that but to no avail. I can consume the data off of kafka with no problems
With a different client
Did you use 0.10.0 for your test @User?
s
@User thats actually my bad. I had tried it on current master. With both the table schemas
l
👍 Does it fail on 0.10.0 then?
k
can you share your updated schema and table config?
l
Copy code
{
  "schemaName": "myObjects",
  "dimensionFieldSpecs": [
    {
      "name": "objectId",
      "dataType": "STRING"
    },
    {
      "name": "jsonObject",
      "dataType": "JSON"
    }
  ],
  "dateTimeFieldSpecs": [
    {
      "name": "lastModified",
      "dataType": "LONG",
      "format": "1:MILLISECONDS:EPOCH",
      "granularity": "1:DAYS"
    }
  ]
}
Copy code
{
  "REALTIME": {
    "tableName": "myObjects",
    "tableType": "REALTIME",
    "segmentsConfig": {
      "timeType": "MILLISECONDS",
      "schemaName": "myObjects",
      "retentionTimeUnit": "DAYS",
      "retentionTimeValue": "365",
      "timeColumnName": "lastModified",
      "allowNullTimeValue": false,
      "replicasPerPartition": "2"
    },
    "tenants": {
      "broker": "DefaultTenant",
      "server": "DefaultTenant"
    },
    "tableIndexConfig": {
      "rangeIndexVersion": 2,
      "jsonIndexColumns": [
        "jsonObject"
      ],
      "autoGeneratedInvertedIndex": false,
      "createInvertedIndexDuringSegmentGeneration": false,
      "loadMode": "MMAP",
      "noDictionaryColumns": [
        "lastModified",
        "jsonObject"
      ],
      "enableDefaultStarTree": false,
      "enableDynamicStarTreeCreation": false,
      "segmentPartitionConfig": {
        "columnPartitionMap": {
          "objectId": {
            "functionName": "Murmur",
            "numPartitions": 2
          }
        }
      },
      "aggregateMetrics": false,
      "nullHandlingEnabled": false
    },
    "metadata": {
      "customConfigs": {}
    },
    "routing": {
      "segmentPrunerTypes": [
        "partition"
      ],
      "instanceSelectorType": "replicaGroup"
    },
    "instanceAssignmentConfigMap": {
      "CONSUMING": {
        "tagPoolConfig": {
          "tag": "DefaultTenant",
          "poolBased": false,
          "numPools": 0
        },
        "replicaGroupPartitionConfig": {
          "replicaGroupBased": true,
          "numInstances": 0,
          "numReplicaGroups": 2,
          "numInstancesPerReplicaGroup": 8,
          "numPartitions": 0,
          "numInstancesPerPartition": 0
        }
      }
    },
    "upsertConfig": {
      "mode": "NONE",
      "hashFunction": "NONE"
    },
    "ingestionConfig": {
      "streamIngestionConfig": {
        "streamConfigMaps": [
          {
            "streamType": "kafka",
            "stream.kafka.consumer.type": "lowlevel",
            "stream.kafka.topic.name": "my_objects",
            "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
            "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
            "stream.kafka.broker.list": ".....",
            "realtime.segment.flush.threshold.rows": "0",
            "realtime.segment.flush.threshold.time": "24h",
            "realtime.segment.flush.threshold.segment.size": "200M",
            "realtime.segment.flush.autotune.initialRows": "2000000",
            "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
          }
        ]
      },
      "transformConfigs": [],
      "complexTypeConfig": {}
    },
    "isDimTable": false
  }
}
k
this seems the one already posted in the chat?
l
Because it is, I haven't changed anything else. I tried setting transformConfigs yesterday if that's what you're asking
k
ok. yeah transformConfigs with field in schema as STRING instead of JSON
l
Yeah that's what I tried, it still shows up as null unfortunately
n
@User i think one issue is the
"complexTypeConfig": {}
. I was able to reproduce your issue (with the jsonObjectStr as STRING + jsonFormat). And on debugging saw that having
"complexTypeConfig": {}
in the table config makes it take a differet branch of the code, which causes the problem. can you try without it?
tried with JSON as dataType for jsonObject . that one does indeed fail in 0.10.0, but has been fixed on current master
l
Thanks Neha. I tried that again by removing complexTypeConfig, but it didn't do the trick unfortunately. Perhaps it's because I have to recreate the table and remove the "affected" JSON one indeed.
Are we planning on releasing a hotfix for this bug?
n
yes, please try cleanly. Sharing my table config and schema too. This one works with 0.10.0
there isn’t a possibility for you to use the latest docker image? if not, we could consider making a hotfix. cc @User
x
we do nightly build around 6pm PDT
l
I can't use latest because we enforce version pinning. A hotfix would be really helpful