https://pinot.apache.org/ logo
h

Harold Lim

01/27/2021, 10:23 PM
Hi. I'm trying to follow the steps here: https://docs.pinot.apache.org/basics/data-import/pinot-stream-ingestion/import-from-apache-kafka Does the pinot schema for the corresponding kafka topic need to exactly match? Does Pinot support flattening the data? Currently, we have messages in Kafka in json-format. I'm looking at setting up Pinot to ingest data from this topic. The dimensions are currently nested inside a "labels" dictionary of the Kafka message.
w

Will Briggs

01/27/2021, 10:27 PM
I’m not aware of any generic support for flattening the data, but you have a few options - one is to just stick with the high level schema w/ your
labels
dictionary, and use a JSON Index (https://docs.pinot.apache.org/basics/indexing/json-index) to speed up querying against it. That obviously isn’t ideal if you also want other types of indices.
Another option would be to use an ingestion transformation to extract the fields you want out of the dictionary into top-level columns: https://docs.pinot.apache.org/developers/advanced/ingestion-level-transformations
As you can see, the documentation for Flattening implies that extracting attributes from complex objects is ‘TBD’, and 1:Many requires implementing a custom Decoder: https://docs.pinot.apache.org/developers/advanced/ingestion-level-transformations#flattening
h

Harold Lim

01/27/2021, 10:33 PM
👍Thanks. Let me read up on the docs.
w

Will Briggs

01/27/2021, 10:36 PM
You could also use a custom StreamMessageDecoder to simplify what could be a painful set of individual ingestion transformations if you have a lot of columns: https://docs.pinot.apache.org/developers/plugin-architecture/write-custom-plugins/record-reader#stream-decoder-plugin Lastly, you could use a custom Kafka Streams job or Kafka Connect pipeline to transform and push the data into another topic, in a format more friendly to ingestion, but obviously that puts more load on Kafka, and adds complexity
k

Kishore G

01/28/2021, 3:27 AM
Yes, flattening is supported @Neha Pawar ^^
n

Neha Pawar

01/28/2021, 4:06 AM
Will has pretty much covered all options: 1. ingest json as is, by using json_format(complex_json_column) transform function. Then use jsonExtractScalar() function during query 2. extract and flatten while ingesting using jsonPathString() function 3. use JSON indexing, and don’t do anything else 4. Implement your custom StreamMessageDecoder that will flatten the records. Pinot will handle is decoder publishes multiple records
h

Harold Lim

01/28/2021, 7:15 AM
Hi Neha, Just to clarify, do we need to do 1. to be able to do 3.?
n

Neha Pawar

01/28/2021, 4:55 PM
Partly. You do need to use jsonformat udf to store the nested json as a string, in both 1 and 3. And in 3 you apply index config on that
Which version of Pinot are you using? Json index is available only in the very latest, so you might have to build from master
h

Harold Lim

01/28/2021, 5:08 PM
It should be the latest one (I deployed it using helm). One other thing, the labels json-dict does not have all fields populated in all records (e.g., x field may be missing in one of the record), e.g., one record has {'x':'a', 'y':'b'}, and another record only has {'y':'c'} It seems that it causes an exception when I run the a query with a where JSON_EXTRACT_SCALAR(labels, '$.x', 'STRING') = 'a'. Is there a way around this? Looking at the doc (haven't tried it yet), for 2. above, we can add a default value for the JSON_PATH_STRING function, but this does not seem the case for JSON_EXTRACT_SCALAR.
k

Kishore G

01/28/2021, 5:33 PM
that seems to be a bug. @Xiang Fu ^^ may be use a case statement or groovy to replace it with a default value for now,
x

Xiang Fu

01/28/2021, 5:38 PM
I feel the right behavior is to add a filter to check that this jsonPath exists in Where clause. Cause the json_extract_scalar is a transform function on empty row. What we need is to filter the row out.
k

Kishore G

01/28/2021, 6:14 PM
I think we need both
x

Xiang Fu

01/28/2021, 6:19 PM
we can add a default value for this function as an optional parameter. The ultimate fix might be re-write the query to add the existence check to predicate as well.