Hello, I am facing difficulties in flattening and ...
# troubleshooting
t
Hello, I am facing difficulties in flattening and transforming json records from kafka. This is the structure of the json record
Copy code
{
  "event_name": "abcd",
  "event_type": "",
  "version": "v1",
  "write_key": "",
  "properties": {
    "status_code": "some_code",
    "status": "some_status",
    "mode": "some_mode"
  },
  "event_timestamp": 1616157914,
  "mode": "live"
}
And my schema looks like this
Copy code
"mode": "string",
"request_failure": "INT"
I want to define the
mode
columns as
$.properties.mode
, which is just simple json flattening, From the docs, I was not able to understand the correct syntax for
jsonPathString
to use in the tableConfig.
And the
request_failure
column is a derived column based on
$.properties.status
. I got to know after reading docs that chaining transformations isn’t supported in pinot, So I can’t define a column
status = $.properties.status_code
and then use it to define the other columns as
request_failure = if(status == 'created', 1, 0)
. So I think, I need to write a groovy script to extract the value from nested json and apply the if/else logic. But to extract a value from nested json, I would have to import json slurper in groovy(not so familiar with groovy, but this is what I found on SOF/internet to parse json in groovy). So my question here is, does pinot support import statements in the groovy script? If not, how can I achieve this transformation in pinot?
To give context around the use case, this use-case is currently onboarded to druid and then alerts are setup in thirdeye on top of the datasource in druid. We were facing issues with query response time because of one column. So we are trying to migrate the datasource to pinot and then try out star tree indexing with a hope that we would see much better performance. As of now, we are facing issues in migrating the datasource from druid to pinot(All the required transformations are supported in druid).
Do let me know, if more info is required.
@Neha Pawar
We are running the image with
latest
tag.
n
Chaining is supported now
t
Great!! Then it would be straightforward ig. Can you help me with the syntax here https://apache-pinot.slack.com/archives/C011C9JHN7R/p1616381314085700?thread_ts=1616381204.085500&cid=C011C9JHN7R
And also I am trying to apply
filterConfig
, but it isn’t working. Here is my tableConfig
Copy code
{
  "tableName": "x_fts_merchant_events_REALTIME",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "schemaName": "x_fts_merchant_events_dimensions",
    "timeColumnName": "event_timestamp",
    "timeType": "MILLISECONDS",
    "replicasPerPartition": "1",
    "retentionTimeValue": "1",
    "retentionTimeUnit": "DAYS",
    "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy"
  },
  "tenants": {
    "broker": "DefaultTenant",
    "server": "DefaultTenant"
  },
  "tableIndexConfig": {
    "streamConfigs": {
      "streamType": "kafka",
      "stream.kafka.consumer.type": "LowLevel",
      "stream.kafka.topic.name": "x-fts-events-kafka",
      "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
      "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
      "stream.kafka.broker.list": "kafka-kafka-bootstrap.kafka.svc.cluster.local:9092",
      "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
    },
    "loadMode": "MMAP"
  },
  "metadata": {},
  "ingestionConfig": {
    "filterConfig": {
      "filterFunction": "Groovy({event_name == 'abc'}, event_name)"
    }
  }
}
n
"transformFunction": "jsonPathString(properties, '$.mode')"
this demo also uses it:

https://youtu.be/L5b_OJVOJKo?t=2514

filter config looks right, it should work
why do you think it’s not working? are you still seeing event_name ‘abc’ in the ingested data?
t
Yes. I created two tables, with and without filterConfig. In the table without filter config I am able to see the record. But not in the table with filter config.
are you still seeing event_name ‘abc’ in the ingested data?
Not able to see this. Ideally, I should be able to.
n
the way you’ve added the filter config, “abc” will be filtered out. That means it wont be included in the data
t
Oh!. I thought if the condition is satisfied, then the record would not be skipped(this was the case in druid, so..). My bad. Will update the filter and try it again.
Chaining is supported now
Just to confirm. I first extract the field from json using inbuilt function, then I can use groovy to derive a field from the extracted column, correct?
n
Correct, should be supported in the latest. Was added about a month back
t
I think the image was pulled 5 days ago. Will try these and update the thread. Thanks Neha!
n
Cool 👍
t
One more question, when chaining transformations, do the intermediate columns have to be present in the schema? Is it possible to just use them in other transformations and exclude them from schema?
n
Hmm, they will have to be in the schema.
k
Chaining transform function is supported
t
All the suggestions worked. Thanks!!!
n
Great! :)