{
"schema":
{"type":"struct","fields":[{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"vhnumber"},{"type":"string","optional":true,"field":"phnnumber"},{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"password"},{"type":"string","optional":true,"field":"vehicleType"},{"type":"int32","optional":true,"field":"status_id"}]
,"optional":false,"name":"driver"},
"payload":"name":"ss","vhnumber":"123","phnnumber":"123","id":17,"password":"2060","vehicleType":"ppol","status_id":10}}
}
this is the kafka event in consumer
how do i convert this to a pinot schema
the data that is needed is only the "payload" attribute
how do i write custom decoder for it?
n
Neha Pawar
12/18/2020, 5:42 PM
Few options:
1. You can just have the whole payload string as a field using json_format UDF: https://docs.pinot.apache.org/developers/advanced/ingestion-level-transformations#json-functions and then read it in query time using json_extract_scalar
2. Alternately, you can pick the fields you want from the payload and map them to individual dimensions, using json functions or groovy UDF. @Xiang Fu could you point him to the doc for the jsonpath functions you added recently?
3. You can use a flink job to flatten this out
4. Custom decoder can be written bywriting your own StreamMessageDecoder, but that is unnecessary i think
5. @Kishore G is json indexing ready to use?
k
Kishore G
12/18/2020, 5:48 PM
not yet. PR is still in review
t
Taran Rishit
12/18/2020, 6:06 PM
we are streaming mysql data to kafka and we would like to stream same data outputted(which is the data above) to pinot table ,can we do that? because pinot accepts only one-level json format
n
Neha Pawar
12/18/2020, 6:10 PM
in that case you can use option 1 or 2
x
Xiang Fu
12/18/2020, 6:25 PM
I think json scalars in transform functions should work