Hello guys, what is the correct value for "stream....
# general
g
Hello guys, what is the correct value for "stream.kafka.decoder.class.name" when decoding an avro message from Schema Registry?
depends on your kafka message encoding type
g
my key/message is avro-encoded, because im using kafka-connect with these parameters KEY_CONVERTER: 'io.confluent.connect.avro.AvroConverter' VALUE_CONVERTER: 'io.confluent.connect.avro.AvroConverter'
Is this the correct value? org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder
x
then you can use confluent decoder and set the correct schema registry information. @User might have more inputs
e
Hi @User, you can try the following:
Copy code
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
        "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
        "stream.kafka.decoder.prop.schema.registry.rest.url": "<http://confluent-schema-registry.schema-registry:8081>"
substitute the schema registry url for the one you use
this is part of the "streamConfigs"
Does that work for you?
g
Aditionally do I need a json_format(message) on transformConfigs?
Im getting this error
Failed to read table size for comercios_REALTIME java.lang.IllegalStateException: Ideal state does not exist for table: comercios_REALTIME
Along, with this query result. While I'm able to get the ts_ms, my message value is null
e
Does the pinot column name match up with the name in the GenericRecord from kafka? What is the avro type?
g
Heres my pinot schema, pinot table and a sample message from the topic.
e
What's the avro schema for the sample_avro_key_value_topic?
g
e
Can you check the server logs, maybe there is an ingestion error.
g
No errors, aside from then one stated previously.
e
nothing before that? Also, table config looks fine, not sure why it says "table not found" - is replication factor (you have it at 1) less than # of servers? Maybe try increasing it?
Was wondering if there was an error on ingestion, i.e. maybe the json transform function?
g
I'm not seeing anything from a transform function error. Maybe you can help me?
e
how many servers do you have?
g
Just 1
e
Not sure why it would say "table not found", I don't even see it having any ingestion errors.
Maybe enable debug logging and try? Seems like the error isn't being logged.
g
Hi, I just found the solution my schema was mapped incorrectly. The argument in the json_format transformation must match your column from the schema registry as well. Also, the column in the schema used for the json_format trans. can be any name, Thanks @User for helping me!
🙌 1
"ingestionConfig": {       "transformConfigs": [         {           "columnName": "before",           "transformFunction": "json_format(bef)"         },         {           "columnName": "aft",           "transformFunction": "json_format(aft)"          "ingestionConfig": {       "transformConfigs": [         {           "columnName": "bef",           "transformFunction": "json_format(before)”         },         {           "columnName": "aft ",           "transformFunction": "json_format(after)”         }       ]     },}       ]     },
d
Hi @User I'm new in Pinot, and I was wondering if you could help me. Is there a way for consuming records coded/decoded by Hortonworks serialization/deserialization and its schema registry? We store all schemas in Hortonworks schema registry, since it's not the most popular schema-registry it's difficult to find some ways. Thanks in advance.
e
Hi @User, is the horton works schema registry api compatible with schema registry? Similar to the alternative offered by redhat?
d
ohh I don't think they're compatible but I'll check it locally. What if it turns out they're not compatible? (Hortonworks & Confluent)
b
@Gabriel Lucano Were you able to apply the same transformation to the key of each Kafka record? I am trying to figure out the correct configurations for deserializing Kafka record keys using the same decoder:
org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder
.