hello everyone i found some error whith transform...
# troubleshooting
hello everyone i found some error whith transformFunction jsonPathString i can not use word order in jsonPathString --> "transformFunction": "jsonPathString(order,'$.channel')" -->this is not work. i test modify json replace from order to hello and user this --> "transformFunction": "jsonPathString(hello,'$.channel')" it's working. why i can not use "order". my real json massage they use "order". Please help.
Invalid transform function 'jsonPathString(order,'$.channel')' for column 'channel' exception: Invalid transform function 'jsonPathString(order,'$.channel')' for column 'channel' Handled request from POST, content-type application/json status code 400 Bad Request
without looking into it any further yet, it might be b/c 'order' is a reserved word in sql and this is likely going through a sql parser...but maybe you can quote the word order and see if that works:
"transformFunction": "jsonPathString(\"order\",'$.channel')"
oh it's work thank very much @User
@User "submissionDate" : "2022-03-15T173144.540+0700" how can i use date format with this? I try to use this but not work "dateTimeFieldSpecs": [ { "name": "submissionDate", "dataType": "TIMESTAMP", "format": "1MILLISECONDSSIMPLE_DATE_FORMAT:yyyy-MM-dd'T'HHmmss.SSSZ", "granularity": "1:MILLISECONDS" } ]
you'll need to use a transformation function for that as well. You can see an example here - https://dev.startree.ai/docs/pinot/recipes/datetime-string-to-timestamp#add-schema-and-table
thank you.
you'll have to use a different column name than what is in the source data btw
or you'll get an error
on my json data type is string. "submissionDate" : "2022-03-15T173144.540+0700" I test to create table . it can create but not see data. schema structure that i test is. { "schemaName": "omx_order_20", "dimensionFieldSpecs": [ { "name": "channel", "dataType": "STRING" }, { "name": "orderId", "dataType": "STRING" } ], "dateTimeFieldSpecs": [ { "name": "submissionDate", "dataType": "STRING", "format": "1MILLISECONDSSIMPLE_DATE_FORMAT:yyyy-MM-dd'T'HHmmss.SSSZ", "granularity": "1:MILLISECONDS" } ] }
i will check your link and test, thank you for answer reserved word.
oh as a string it should work. But if it's a string I don't think you'd be able to do proper data operations (in case you wanted to do that)
normally for real time table. we use dataFormat from system or in side json massage ?
unless you have a datetime column in your schema/table there won't be one, so you do need to specify the data yourself (e.g. via JSON message)
i try to create table. it can create but not found any Data. { "tableName": "omx_order_20", "tableType": "REALTIME", "tenants": { "broker": "DefaultTenant", "server": "DefaultTenant", "tagOverrideConfig": {} }, "segmentsConfig": { "schemaName": "omx_order_20", "timeColumnName": "submissionDate", "replication": "1", "replicasPerPartition": "1", "retentionTimeUnit": null, "retentionTimeValue": null, "completionConfig": null, "crypterClassName": null, "peerSegmentDownloadScheme": null }, "tableIndexConfig": { "loadMode": "MMAP", "invertedIndexColumns": [], "createInvertedIndexDuringSegmentGeneration": false, "rangeIndexColumns": [], "sortedColumn": [], "bloomFilterColumns": [], "bloomFilterConfigs": null, "noDictionaryColumns": [], "onHeapDictionaryColumns": [], "varLengthDictionaryColumns": [], "enableDefaultStarTree": false, "starTreeIndexConfigs": null, "enableDynamicStarTreeCreation": false, "segmentPartitionConfig": null, "columnMinMaxValueGeneratorMode": null, "aggregateMetrics": false, "nullHandlingEnabled": false, "streamConfigs": { "streamType": "kafka", "stream.kafka.topic.name": "omx_order20", "stream.kafka.broker.list": "", "stream.kafka.consumer.type": "lowlevel", "stream.kafka.consumer.prop.auto.offset.reset": "smallest", "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory", "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder", "realtime.segment.flush.threshold.rows": "0", "realtime.segment.flush.threshold.time": "24h", "realtime.segment.flush.segment.size": "100M" } }, "metadata": {}, "ingestionConfig": { "filterConfig": null, "transformConfigs": [ { "columnName": "channel", "transformFunction": "jsonPathString(\"order\",'$.channel')" }, { "columnName": "orderId", "transformFunction": "jsonPathString(\"order\",'$.orderId')" }, { "columnName": "submissionDate", "transformFunction": "FromDateTime('$.submissionDate','YYYY-MM-dd''T''HHmmss.SSSZ')" } ] }, "quota": { "storage": null, "maxQueriesPerSecond": null }, "task": null, "routing": { "segmentPrunerTypes": null, "instanceSelectorType": null }, "query": { "timeoutMs": null }, "fieldConfigList": null, "upsertConfig": null, "tierConfigs": null }
i use "columnName": "submissionDate", "transformFunction": "FromDateTime('$.submissionDate','YYYY-MM-dd''T''HHmmss.SSSZ')"
my json like this --> {"order" : { "channel" : "ABC", "orderId" : "22031500DRS020020017"}, "submissionDate" : "2022-03-15T173144.540+0700"}
any recommend for Datetime format converse. @User
You can't use the JSON path syntax inside
- it doesn't know what it means
also I think you can only run one function at a time
so you'll need to do it in two steps
i just want to create table consume kafka. i test with EPOCH on json. it no need to do anything. but for this --> "submissionDate" : "2022-03-15T173144.540+0700" we need to converse ?
epoch you don't need any time conversion
it'll handle that automatically
it's only cause of it being a date string that you have to do some conversion
how can i do it bro.
  "columnName": "submissionTs",
  "transformFunction": "FromDateTime(submissionDate,'YYYY-MM-dd''T''HH:mm:ss.SSSZ')"
rename the column to something else
and then like this should work
ok .modify column name another name .
yeh - if you use a transform fn it can't transform a value to the same name
it's a bit annoying, but that's how it is!
oh it's work.
i have some quesion about json msg
if in my json have many object. {"order": {"channel": "SFF","orderId": "22031500DRS020020016"},"customer": "570809","omxtrackingID": "99-d173c048-8bf2-4261-a440-36d1045c63e2","submissionDate": "2022-03-15T173144.540+0700"}
but i just need only column channel, orderId and submissionDate. i can secify and select some object that i want to use or i need to create all column for that json ? @User
you are expert in pinot bro. Thank you so much for you answer, it's help me so much. @User. i just begin poc pinot and plan to use with trino for realtime data platfrom.
you only need to create the columns that you need - don't need to create columns for every JSON property
Thank you bro. you help me so much. tomorrow i will select some column for test poc with trino.
Did you know how to see more log. i start via systemctl log it a little not help anything. i want to see more log from pinot. i add this on systemd --> Environment="JAVA_OPTS=-Xms6G -Xmx8G -Dlog4j2.logLevel=DEBUG" log still less...
logs are under
on each component
will be much more in there
thank you bro.
now i can create realtime table consume real topic on my kafka production.
but not see any data store on my pinot server. i need to design storage space for keep pinot data. where is pinot store the data @User not see any data on pinot-controller (nfs share) and not see any data store on pinot server. how pinot keep data for realtime table?
it stores it under
which will be a tmp dir unless you explicitly set it
how can i use separate kerberlos on realtime table config? did you have some example for config separate kdc? this is my stream config. now i can connect only 1 kdc it default on /etc/krb5.conf we have multiple kafka and kdc need to connect... "streamConfigs": { "streamType": "kafka", "stream.kafka.topic.name": "PROD-json-MB-Postpaid-Sales-Online", "stream.kafka.broker.list": "tykbpr019092,tykbpr029092,tykbpr01:9092", "stream.kafka.consumer.type": "lowlevel", "stream.kafka.consumer.prop.auto.offset.reset": "smallest", "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory", "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder", "realtime.segment.flush.threshold.rows": "0", "realtime.segment.flush.threshold.time": "1h", "realtime.segment.flush.segment.size": "100m", "stream.kafka.consumer.group.id": "rdp_lookup", "security.protocol": "SASL_SSL", "sasl.mechanism": "GSSAPI", "sasl.kerberos.service.name": "kafka", "ssl.truststore.location": "/data/apache-pinot/keytab/tykbpr.client.truststore.jks", "ssl.truststore.password": "xxxx", "sasl.jaas.config": "com.sun.security.auth.module.Krb5LoginModule required useTicketCache=true useKeyTab=true storeKey=true keyTab=\"/data/apache-pinot/keytab/U-SVC-RDP.keytab\" principal=\"U-SVC-RDP@TRUE.TH\" doNotPrompt=false;" } @User
it stores it under
which will be a tmp dir unless you explicitly set it --> it flush data after threshold? i consume 180000 offset, still not see writing data on server. "realtime.segment.flush.threshold.rows": "0", "realtime.segment.flush.threshold.time": "24h", "realtime.segment.flush.segment.size": "100m", @User
https://docs.pinot.apache.org/basics/components/deep-store take a look at this and then the S3 example as well
thank you.
hi it's me again bro. @User how can i have multiple Kerberos server (multiple kafka seperate kdc) how can i specific krb5.conf on realtime table config? now i try to config real time table difference kdc, it's working only default realm...