This message was deleted.
# general
s
This message was deleted.
o
if it helps - here is the schema for the topic.
Copy code
{
  "version": 2,
  "schemaInfo": {
    "name": "demo.public.demo1",
    "schema": {
      "key": {
        "name": "KafkaAvro",
        "schema": {
          "type": "record",
          "name": "Key",
          "namespace": "demo.public.demo1",
          "fields": [
            {
              "name": "site_id",
              "type": "int"
            },
            {
              "name": "sublocation",
              "type": "string"
            },
            {
              "name": "measured_at",
              "type": {
                "type": "string",
                "connect.version": 1,
                "connect.name": "io.debezium.time.ZonedTimestamp"
              }
            },
            {
              "name": "depth",
              "type": "float"
            }
          ],
          "connect.name": "demo.public.demo1.Key"
        },
        "type": "AVRO",
        "timestamp": 0,
        "properties": {
          "__AVRO_READ_OFFSET__": "5"
        }
      },
      "value": {
        "name": "KafkaAvro",
        "schema": {
          "type": "record",
          "name": "Envelope",
          "namespace": "demo.public.demo1",
          "fields": [
            {
              "name": "before",
              "type": [
                "null",
                {
                  "type": "record",
                  "name": "Value",
                  "fields": [
                    {
                      "name": "site_id",
                      "type": "int"
                    },
                    {
                      "name": "sublocation",
                      "type": "string"
                    },
                    {
                      "name": "measured_at",
                      "type": {
                        "type": "string",
                        "connect.version": 1,
                        "connect.name": "io.debezium.time.ZonedTimestamp"
                      }
                    },
                    {
                      "name": "depth",
                      "type": "float"
                    },
                    {
                      "name": "value",
                      "type": [
                        "null",
                        "float"
                      ]
                    }
                  ],
                  "connect.name": "demo.public.demo1.Value"
                }
              ]
            },
            {
              "name": "after",
              "type": [
                "null",
                "Value"
              ]
            },
            {
              "name": "source",
              "type": {
                "type": "record",
                "name": "Source",
                "namespace": "io.debezium.connector.postgresql",
                "fields": [
                  {
                    "name": "version",
                    "type": "string"
                  },
                  {
                    "name": "connector",
                    "type": "string"
                  },
                  {
                    "name": "name",
                    "type": "string"
                  },
                  {
                    "name": "ts_ms",
                    "type": "long"
                  },
                  {
                    "name": "snapshot",
                    "type": [
                      {
                        "type": "string",
                        "connect.version": 1,
                        "connect.parameters": {
                          "allowed": "true,last,false"
                        },
                        "connect.default": "false",
                        "connect.name": "io.debezium.data.Enum"
                      },
                      "null"
                    ],
                    "default": "false"
                  },
                  {
                    "name": "db",
                    "type": "string"
                  },
                  {
                    "name": "sequence",
                    "type": [
                      "null",
                      "string"
                    ]
                  },
                  {
                    "name": "schema",
                    "type": "string"
                  },
                  {
                    "name": "table",
                    "type": "string"
                  },
                  {
                    "name": "txId",
                    "type": [
                      "null",
                      "long"
                    ]
                  },
                  {
                    "name": "lsn",
                    "type": [
                      "null",
                      "long"
                    ]
                  },
                  {
                    "name": "xmin",
                    "type": [
                      "null",
                      "long"
                    ]
                  }
                ],
                "connect.name": "io.debezium.connector.postgresql.Source"
              }
            },
            {
              "name": "op",
              "type": "string"
            },
            {
              "name": "ts_ms",
              "type": [
                "null",
                "long"
              ]
            },
            {
              "name": "transaction",
              "type": [
                "null",
                {
                  "type": "record",
                  "name": "ConnectDefault",
                  "namespace": "org.apache.pulsar.kafka.shade.io.confluent.connect.avro",
                  "fields": [
                    {
                      "name": "id",
                      "type": "string"
                    },
                    {
                      "name": "total_order",
                      "type": "long"
                    },
                    {
                      "name": "data_collection_order",
                      "type": "long"
                    }
                  ]
                }
              ]
            }
          ],
          "connect.name": "demo.public.demo1.Envelope"
        },
        "type": "AVRO",
        "timestamp": 0,
        "properties": {
          "__AVRO_READ_OFFSET__": "5"
        }
      }
    },
    "type": "KEY_VALUE",
    "timestamp": 1684286797003,
    "properties": {
      "key.schema.name": "KafkaAvro",
      "key.schema.properties": "{\"__AVRO_READ_OFFSET__\":\"5\"}",
      "key.schema.type": "AVRO",
      "kv.encoding.type": "SEPARATED",
      "value.schema.name": "KafkaAvro",
      "value.schema.properties": "{\"__AVRO_READ_OFFSET__\":\"5\"}",
      "value.schema.type": "AVRO"
    }
  }
}
For future reference: Make sure to do a seek on the bytes object defined by the
__AVRO_READ_OFFSET__
in the schema. Here is a outline on my workflow if it helps anyone:
Copy code
# Consumer subscription etc....
msg = consumer.receive()
key = base64decode(msg.partition_key())
key_schema_dict = { key schema pulled from above }
key_reader = avro.io.DatumReader(avro.schema.make_avsc_object(key_schema_dict))
byte_obj = io.BytesIO(key)
byte_obj.seek(5) # __AVRO_READ_OFFSET__
bin_decode = avro.io.BindaryDecoder(byte_obj)
output = key_reader.read(bin_decode)