Tan Kim
06/07/2023, 1:20 AM"json": {
"eventName": "event-ABC",
...
}
The source is json format and sink is avro format with confluent-schema registry.
Here is my code.
tableEnv.executeSql("CREATE TABLE source_table (..) WITH (
'connector'='kafka',
'format'='json',
)")
tableEnv.executeSql("CREATE TABLE sink_table WITH (
'connector'='kafka',
'format'='avro-confluent',
..
) AS SELECT * FROM source_table")
If I run this code without ‘value.avro-confluent.subject’ configuration, the record is something like this.
{
"json": {
"org.apache.flink.avro.generated.record_json": {
"eventName": {
"string": "event-ABC"
},
..
}
}
I don’t understand why flink-avro inserts “org.apache.flink.avro.generated.record_json” between json
and eventName
.
Also eventName
is not just ‘event-ABC’ but string: event-ABC
.
Is this bug? or something I missed?Tan Kim
06/07/2023, 1:23 AMSchema being registered is incompatible with an earlier schema for subject "myschema-value", details: [{errorType:'MISSING_UNION_BRANCH', description:'The new schema is missing a type inside a union field at path '/fields/0/type' in the old schema', additionalInfo:'reader union lacking writer type: RECORD'}