https://pinot.apache.org/ logo
a

Arun Vasudevan

05/10/2021, 11:26 PM
Hi, Could you guys help me with the Avro -> Pinot Schema Conversion?
Avro Schema:
Copy code
{
  "type": "record",
  "name": "Clickstream",
  "namespace": "com.acme.event.clickstream.business",
  "fields": [
    {
      "name": "event_header",
      "type": {
        "type": "record",
        "name": "EventHeader",
        "namespace": "com.acme.event",
        "fields": [
          {
            "name": "event_uuid",
            "type": {
              "type": "string",
              "avro.java.string": "String",
              "logicalType": "uuid"
            },
            "doc": "Universally Unique Identifier for this event "
          },
          {
            "name": "published_timestamp",
            "type": {
              "type": "long",
              "logicalType": "timestamp-millis"
            },
            "doc": "Timestamp in milliseconds since the epoch that the event occurred on its producing device. e.g. <code>System.currentTimeMillis()</code>."
          }]
       }
     }
}
The corresponding Pinot Schema i have is:
Copy code
{
  "schemaName": "user_clickstream_v1",
  "dimensionFieldSpecs": [
    {
      "name": "event_header.event_uuid",
      "dataType": "STRING"
    }
  ],
  "dateTimeFieldSpecs": [
    {
      "name": "event_header.published_timestamp",
      "dataType": "LONG",
      "format": "1:MILLISECONDS:EPOCH",
      "granularity": "1:MILLISECONDS"
    }
  ]
}
In the created Pinot Table i see all the values as null….I suspect the issue is in the Schema… Any idea?
n

Neha Pawar

05/10/2021, 11:41 PM
did you set transform functions in the table config?
this demo explains what transform function you’d have to set:

https://youtu.be/L5b_OJVOJKo?t=2292

thankyou 1
treat the avro hierarchy similar to the json as shown in the demo
a

Arun Vasudevan

05/11/2021, 6:01 PM
Thanks this helped to move forward. I created a Pinot Table with Transformation functions thru the UI i get a message
Table xxx successfully added
but i don’t see the table nor able to query it. Also see the success message in the Controller too…. Is this a know issue? Any idea to find out the root-cause? Here is the table config (with Transform functions) ….
Copy code
{
  "tableName": "user_clickstream_v1",
  "tableType": "REALTIME",
  "tenants": {
    "broker": "DefaultTenant",
    "server": "DefaultTenant",
    "tagOverrideConfig": {}
  },
  "segmentsConfig": {
    "schemaName": "user_clickstream_v1",
    "timeColumnName": "published_timestamp",
    "replication": "1",
    "replicasPerPartition": "1",
    "retentionTimeUnit": null,
    "retentionTimeValue": null,
    "completionConfig": null,
    "crypterClassName": null,
    "peerSegmentDownloadScheme": null
  },
  "tableIndexConfig": {
    "loadMode": "MMAP",
    "invertedIndexColumns": [],
    "createInvertedIndexDuringSegmentGeneration": false,
    "rangeIndexColumns": [],
    "sortedColumn": [],
    "bloomFilterColumns": [],
    "bloomFilterConfigs": null,
    "noDictionaryColumns": [],
    "onHeapDictionaryColumns": [],
    "varLengthDictionaryColumns": [],
    "enableDefaultStarTree": false,
    "starTreeIndexConfigs": null,
    "enableDynamicStarTreeCreation": false,
    "segmentPartitionConfig": null,
    "columnMinMaxValueGeneratorMode": null,
    "aggregateMetrics": false,
    "nullHandlingEnabled": false,
    "streamConfigs": {
      "streamType": "kafka",
      "stream.kafka.topic.name": "user_clickstream_v1",
      "stream.kafka.broker.list": "xxxxxxx:9092",
      "stream.kafka.consumer.type": "lowlevel",
      "stream.kafka.consumer.prop.auto.offset.reset": "largest",
      "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
      "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
      "realtime.segment.flush.threshold.rows": "0",
      "realtime.segment.flush.threshold.time": "24h",
      "realtime.segment.flush.segment.size": "100M",
      "stream.kafka.decoder.prop.schema.registry.rest.url": "<http://xxxxxx:8081>",
      "schema.registry.url": "<http://xxxxx:8081>"
    }
  },
  "metadata": {},
  "ingestionConfig": {
    "filterConfig": null,
    "transformConfigs": [
      {
        "columnName": "event_uuid",
        "transformFunction": "jsonPathString(event_header, '$.event_uuid')"
      },
      {
        "columnName": "event_type",
        "transformFunction": "jsonPathString(event, '$.event_type')"
      },
      {
        "columnName": "event_category",
        "transformFunction": "jsonPathString(event, '$.event_category')"
      },
      {
        "columnName": "event_name",
        "transformFunction": "jsonPathString(event, '$.event_name')"
      },
      {
        "columnName": "page_name",
        "transformFunction": "jsonPathString(event, '$.page_name')"
      },
      {
        "columnName": "device_classification_score",
        "transformFunction": "jsonPathString(bot_classification, '$.device_classification_score')"
      },
      {
        "columnName": "published_timestamp",
        "transformFunction": "jsonPathString(event_header, '$.published_timestamp')"
      }
    ]
  },
  "quota": {
    "storage": null,
    "maxQueriesPerSecond": null
  },
  "task": null,
  "routing": {
    "segmentPrunerTypes": null,
    "instanceSelectorType": null
  },
  "query": {
    "timeoutMs": null
  },
  "fieldConfigList": null,
  "upsertConfig": null,
  "tierConfigs": null
}
n

Neha Pawar

05/11/2021, 6:28 PM
not a known issue. very strange. Can you see it via Zookeeper Browser? And any logs in the controller? My hunch is it couldnt connect to the kafka - which it tries to do as part of realtime table creation
a

Arun Vasudevan

05/11/2021, 9:54 PM
It worked out after restarting all the components. Might have been a stale connection…Thanks for helping out!