Hello guys, what is the correct value for "stream.kafka.decoder.class.name" when decoding an avro message from Schema Registry?
depends on your kafka message encoding type
my key/message is avro-encoded, because im using kafka-connect with these parameters KEY_CONVERTER: 'io.confluent.connect.avro.AvroConverter' VALUE_CONVERTER: 'io.confluent.connect.avro.AvroConverter'
Is this the correct value? org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder
then you can use confluent decoder and set the correct schema registry information. @User might have more inputs
Hi @User, you can try the following:
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
        "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
        "stream.kafka.decoder.prop.schema.registry.rest.url": "<http://confluent-schema-registry.schema-registry:8081>"
substitute the schema registry url for the one you use
this is part of the "streamConfigs"
Does that work for you?
Aditionally do I need a json_format(message) on transformConfigs?
Im getting this error
Failed to read table size for comercios_REALTIME java.lang.IllegalStateException: Ideal state does not exist for table: comercios_REALTIME
Along, with this query result. While I'm able to get the ts_ms, my message value is null
Does the pinot column name match up with the name in the GenericRecord from kafka? What is the avro type?
Heres my pinot schema, pinot table and a sample message from the topic.
What's the avro schema for the sample_avro_key_value_topic?
Can you check the server logs, maybe there is an ingestion error.
No errors, aside from then one stated previously.
nothing before that? Also, table config looks fine, not sure why it says "table not found" - is replication factor (you have it at 1) less than # of servers? Maybe try increasing it?
Was wondering if there was an error on ingestion, i.e. maybe the json transform function?
I'm not seeing anything from a transform function error. Maybe you can help me?
how many servers do you have?
Just 1
Not sure why it would say "table not found", I don't even see it having any ingestion errors.
Maybe enable debug logging and try? Seems like the error isn't being logged.
Hi, I just found the solution my schema was mapped incorrectly. The argument in the json_format transformation must match your column from the schema registry as well. Also, the column in the schema used for the json_format trans. can be any name, Thanks @User for helping me!
"ingestionConfig": {       "transformConfigs": [         {           "columnName": "before",           "transformFunction": "json_format(bef)"         },         {           "columnName": "aft",           "transformFunction": "json_format(aft)"          "ingestionConfig": {       "transformConfigs": [         {           "columnName": "bef",           "transformFunction": "json_format(before)”         },         {           "columnName": "aft ",           "transformFunction": "json_format(after)”         }       ]     },}       ]     },
Hi @User I'm new in Pinot, and I was wondering if you could help me. Is there a way for consuming records coded/decoded by Hortonworks serialization/deserialization and its schema registry? We store all schemas in Hortonworks schema registry, since it's not the most popular schema-registry it's difficult to find some ways. Thanks in advance.
Hi @User, is the horton works schema registry api compatible with schema registry? Similar to the alternative offered by redhat?
ohh I don't think they're compatible but I'll check it locally. What if it turns out they're not compatible? (Hortonworks & Confluent)
@Gabriel Lucano Were you able to apply the same transformation to the key of each Kafka record? I am trying to figure out the correct configurations for deserializing Kafka record keys using the same decoder: