I’m trying to relay a topic from kafka to another ...
# troubleshooting
t
I’m trying to relay a topic from kafka to another kafka. This is the original record in source topic.
Copy code
"json": {
        "eventName": "event-ABC",
        ...
    }
The source is json format and sink is avro format with confluent-schema registry. Here is my code.
Copy code
tableEnv.executeSql("CREATE TABLE source_table (..) WITH (
'connector'='kafka', 
'format'='json',
)")

tableEnv.executeSql("CREATE TABLE sink_table WITH (
'connector'='kafka',
'format'='avro-confluent',
..
) AS SELECT * FROM source_table")
If I run this code without ‘value.avro-confluent.subject’ configuration, the record is something like this.
Copy code
{
    "json": {
        "org.apache.flink.avro.generated.record_json": {
            "eventName": {
                "string": "event-ABC"
            },
           ..
         }
}
I don’t understand why flink-avro inserts “org.apache.flink.avro.generated.record_json” between
json
and
eventName
. Also
eventName
is not just ‘event-ABC’ but
string: event-ABC
. Is this bug? or something I missed?
If I add ‘value.avro-confluent.subject’ configuration, I got an incompatible exception of schema.
Copy code
Schema being registered is incompatible with an earlier schema for subject "myschema-value", details: [{errorType:'MISSING_UNION_BRANCH', description:'The new schema is missing a type inside a union field at path '/fields/0/type' in the old schema', additionalInfo:'reader union lacking writer type: RECORD'}