Running into an Avro-based Schema Evolution issue ...
# troubleshooting
r
Running into an Avro-based Schema Evolution issue in a Flink job which feels like it should be supported. I just pushed out a deployment using some classes that are generated via the Maven Avro plugin and included the following two new fields (into the AVSC) definition which should be a compatible evolution:
Copy code
{
      "name": "facilities",
      "type": [
        "null",
        {
          "type": "array",
          "items": [
            "null",
            "string"
          ]
        }
      ],
      "default": null
},
{
      "name": "sources",
      "type": [
        "null",
        {
          "type": "array",
          "items": [
            "null",
            "string"
          ]
        }
      ],
      "default": null
}
However, when the job came back up I began noticing serialization errors such as the following:
Copy code
Caused by: java.io.IOException: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer.
...
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 320 out of bounds for length 2
	at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460) ~[blob_p-fd9646d20c93adaaf511308efe31314274308aff-ea4a5c5c948a79e12b19a315adbd4a13:?]
	at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283) ~[blob_p-fd9646d20c93adaaf511308efe31314274308aff-ea4a5c5c948a79e12b19a315adbd4a13:?]
	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187) ~
...
2024-09-04 17:38:55,070 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - Field RiskVectorScore#facilities will be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.
...
2024-09-04 17:38:55,070 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - Field RiskVectorScore#sources will be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.
Any recommendations for how to troubleshoot this? It’s not super intuitive to indicate exactly what’s wrong?
It looks like theres a discrepancy for the reader/writer schemas that I can see with DEBUG logging:
Copy code
2024-09-04 17:38:55,090 DEBUG org.apache.avro.SchemaCompatibility                          [] - Checking compatibility of reader <new schema> with writer <old schema>
p
Are old records that were serialized with the v1 schema being deserialized with the v2 schema definition? That won't work, even if the schema change is backwards-compatible. Those old records need to be deserialized with v1 schema definition. Either by embedding it in each message (which Avro support but can be very inefficient), or using e.g. Confluent Schema Registry so that the serialized message is tagged with its schema definition ID which the consumer can look up at deserialization time.
r
In this case, it’s using a schema registry to handle looking it up during deserialization. I’m not 100% sure the issue itself is the change to the schema, but didn’t see anything else in the logs to point to a smoking gun.
👍 1