https://pinot.apache.org/ logo
#general
Title
# general
g

Gabriel Lucano

04/15/2021, 11:17 PM
Hello guys, what is the correct value for "stream.kafka.decoder.class.name" when decoding an avro message from Schema Registry?
depends on your kafka message encoding type
g

Gabriel Lucano

04/15/2021, 11:44 PM
my key/message is avro-encoded, because im using kafka-connect with these parameters KEY_CONVERTER: 'io.confluent.connect.avro.AvroConverter' VALUE_CONVERTER: 'io.confluent.connect.avro.AvroConverter'
Is this the correct value? org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder
x

Xiang Fu

04/15/2021, 11:47 PM
then you can use confluent decoder and set the correct schema registry information. @User might have more inputs
e

Elon

04/15/2021, 11:54 PM
Hi @User, you can try the following:
Copy code
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
        "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
        "stream.kafka.decoder.prop.schema.registry.rest.url": "<http://confluent-schema-registry.schema-registry:8081>"
substitute the schema registry url for the one you use
this is part of the "streamConfigs"
Does that work for you?
g

Gabriel Lucano

04/15/2021, 11:56 PM
Aditionally do I need a json_format(message) on transformConfigs?
Im getting this error
Failed to read table size for comercios_REALTIME java.lang.IllegalStateException: Ideal state does not exist for table: comercios_REALTIME
Along, with this query result. While I'm able to get the ts_ms, my message value is null
e

Elon

04/16/2021, 3:09 AM
Does the pinot column name match up with the name in the GenericRecord from kafka? What is the avro type?
g

Gabriel Lucano

04/16/2021, 3:11 AM
Heres my pinot schema, pinot table and a sample message from the topic.
e

Elon

04/16/2021, 3:17 AM
What's the avro schema for the sample_avro_key_value_topic?
g

Gabriel Lucano

04/16/2021, 3:25 AM
e

Elon

04/16/2021, 3:27 AM
Can you check the server logs, maybe there is an ingestion error.
g

Gabriel Lucano

04/16/2021, 3:31 AM
No errors, aside from then one stated previously.
e

Elon

04/16/2021, 3:35 AM
nothing before that? Also, table config looks fine, not sure why it says "table not found" - is replication factor (you have it at 1) less than # of servers? Maybe try increasing it?
Was wondering if there was an error on ingestion, i.e. maybe the json transform function?
g

Gabriel Lucano

04/16/2021, 3:45 AM
I'm not seeing anything from a transform function error. Maybe you can help me?
e

Elon

04/16/2021, 4:12 AM
how many servers do you have?
g

Gabriel Lucano

04/16/2021, 1:22 PM
Just 1
e

Elon

04/19/2021, 8:42 AM
Not sure why it would say "table not found", I don't even see it having any ingestion errors.
Maybe enable debug logging and try? Seems like the error isn't being logged.
g

Gabriel Lucano

04/19/2021, 3:29 PM
Hi, I just found the solution my schema was mapped incorrectly. The argument in the json_format transformation must match your column from the schema registry as well. Also, the column in the schema used for the json_format trans. can be any name, Thanks @User for helping me!
🙌 1
"ingestionConfig": {       "transformConfigs": [         {           "columnName": "before",           "transformFunction": "json_format(bef)"         },         {           "columnName": "aft",           "transformFunction": "json_format(aft)"          "ingestionConfig": {       "transformConfigs": [         {           "columnName": "bef",           "transformFunction": "json_format(before)”         },         {           "columnName": "aft ",           "transformFunction": "json_format(after)”         }       ]     },}       ]     },
d

David Manukian

05/02/2021, 1:05 PM
Hi @User I'm new in Pinot, and I was wondering if you could help me. Is there a way for consuming records coded/decoded by Hortonworks serialization/deserialization and its schema registry? We store all schemas in Hortonworks schema registry, since it's not the most popular schema-registry it's difficult to find some ways. Thanks in advance.
e

Elon

05/02/2021, 5:37 PM
Hi @User, is the horton works schema registry api compatible with schema registry? Similar to the alternative offered by redhat?
d

David Manukian

05/02/2021, 5:47 PM
ohh I don't think they're compatible but I'll check it locally. What if it turns out they're not compatible? (Hortonworks & Confluent)
b

Brandon Stanley

09/26/2023, 3:50 AM
@Gabriel Lucano Were you able to apply the same transformation to the key of each Kafka record? I am trying to figure out the correct configurations for deserializing Kafka record keys using the same decoder:
org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder
.