https://pinot.apache.org/ logo
t

Tanmay Movva

03/22/2021, 2:46 AM
Hello, I am facing difficulties in flattening and transforming json records from kafka. This is the structure of the json record
Copy code
{
  "event_name": "abcd",
  "event_type": "",
  "version": "v1",
  "write_key": "",
  "properties": {
    "status_code": "some_code",
    "status": "some_status",
    "mode": "some_mode"
  },
  "event_timestamp": 1616157914,
  "mode": "live"
}
And my schema looks like this
Copy code
"mode": "string",
"request_failure": "INT"
I want to define the
mode
columns as
$.properties.mode
, which is just simple json flattening, From the docs, I was not able to understand the correct syntax for
jsonPathString
to use in the tableConfig.
And the
request_failure
column is a derived column based on
$.properties.status
. I got to know after reading docs that chaining transformations isn’t supported in pinot, So I can’t define a column
status = $.properties.status_code
and then use it to define the other columns as
request_failure = if(status == 'created', 1, 0)
. So I think, I need to write a groovy script to extract the value from nested json and apply the if/else logic. But to extract a value from nested json, I would have to import json slurper in groovy(not so familiar with groovy, but this is what I found on SOF/internet to parse json in groovy). So my question here is, does pinot support import statements in the groovy script? If not, how can I achieve this transformation in pinot?
To give context around the use case, this use-case is currently onboarded to druid and then alerts are setup in thirdeye on top of the datasource in druid. We were facing issues with query response time because of one column. So we are trying to migrate the datasource to pinot and then try out star tree indexing with a hope that we would see much better performance. As of now, we are facing issues in migrating the datasource from druid to pinot(All the required transformations are supported in druid).
Do let me know, if more info is required.
@Neha Pawar
We are running the image with
latest
tag.
n

Neha Pawar

03/22/2021, 4:41 AM
Chaining is supported now
t

Tanmay Movva

03/22/2021, 4:42 AM
Great!! Then it would be straightforward ig. Can you help me with the syntax here https://apache-pinot.slack.com/archives/C011C9JHN7R/p1616381314085700?thread_ts=1616381204.085500&cid=C011C9JHN7R
And also I am trying to apply
filterConfig
, but it isn’t working. Here is my tableConfig
Copy code
{
  "tableName": "x_fts_merchant_events_REALTIME",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "schemaName": "x_fts_merchant_events_dimensions",
    "timeColumnName": "event_timestamp",
    "timeType": "MILLISECONDS",
    "replicasPerPartition": "1",
    "retentionTimeValue": "1",
    "retentionTimeUnit": "DAYS",
    "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy"
  },
  "tenants": {
    "broker": "DefaultTenant",
    "server": "DefaultTenant"
  },
  "tableIndexConfig": {
    "streamConfigs": {
      "streamType": "kafka",
      "stream.kafka.consumer.type": "LowLevel",
      "stream.kafka.topic.name": "x-fts-events-kafka",
      "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
      "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
      "stream.kafka.broker.list": "kafka-kafka-bootstrap.kafka.svc.cluster.local:9092",
      "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
    },
    "loadMode": "MMAP"
  },
  "metadata": {},
  "ingestionConfig": {
    "filterConfig": {
      "filterFunction": "Groovy({event_name == 'abc'}, event_name)"
    }
  }
}
n

Neha Pawar

03/22/2021, 4:45 AM
"transformFunction": "jsonPathString(properties, '$.mode')"
this demo also uses it:

https://youtu.be/L5b_OJVOJKo?t=2514

filter config looks right, it should work
why do you think it’s not working? are you still seeing event_name ‘abc’ in the ingested data?
t

Tanmay Movva

03/22/2021, 4:49 AM
Yes. I created two tables, with and without filterConfig. In the table without filter config I am able to see the record. But not in the table with filter config.
are you still seeing event_name ‘abc’ in the ingested data?
Not able to see this. Ideally, I should be able to.
n

Neha Pawar

03/22/2021, 4:50 AM
the way you’ve added the filter config, “abc” will be filtered out. That means it wont be included in the data
t

Tanmay Movva

03/22/2021, 4:51 AM
Oh!. I thought if the condition is satisfied, then the record would not be skipped(this was the case in druid, so..). My bad. Will update the filter and try it again.
Chaining is supported now
Just to confirm. I first extract the field from json using inbuilt function, then I can use groovy to derive a field from the extracted column, correct?
n

Neha Pawar

03/22/2021, 4:54 AM
Correct, should be supported in the latest. Was added about a month back
t

Tanmay Movva

03/22/2021, 4:55 AM
I think the image was pulled 5 days ago. Will try these and update the thread. Thanks Neha!
n

Neha Pawar

03/22/2021, 4:57 AM
Cool 👍
t

Tanmay Movva

03/22/2021, 5:20 AM
One more question, when chaining transformations, do the intermediate columns have to be present in the schema? Is it possible to just use them in other transformations and exclude them from schema?
n

Neha Pawar

03/22/2021, 5:21 AM
Hmm, they will have to be in the schema.
k

Kishore G

03/22/2021, 5:24 AM
Chaining transform function is supported
t

Tanmay Movva

03/22/2021, 6:22 AM
All the suggestions worked. Thanks!!!
n

Neha Pawar

03/22/2021, 3:13 PM
Great! :)