Slackbot
05/17/2023, 12:44 AMOllie Steiner
05/17/2023, 1:24 AM{
"version": 2,
"schemaInfo": {
"name": "demo.public.demo1",
"schema": {
"key": {
"name": "KafkaAvro",
"schema": {
"type": "record",
"name": "Key",
"namespace": "demo.public.demo1",
"fields": [
{
"name": "site_id",
"type": "int"
},
{
"name": "sublocation",
"type": "string"
},
{
"name": "measured_at",
"type": {
"type": "string",
"connect.version": 1,
"connect.name": "io.debezium.time.ZonedTimestamp"
}
},
{
"name": "depth",
"type": "float"
}
],
"connect.name": "demo.public.demo1.Key"
},
"type": "AVRO",
"timestamp": 0,
"properties": {
"__AVRO_READ_OFFSET__": "5"
}
},
"value": {
"name": "KafkaAvro",
"schema": {
"type": "record",
"name": "Envelope",
"namespace": "demo.public.demo1",
"fields": [
{
"name": "before",
"type": [
"null",
{
"type": "record",
"name": "Value",
"fields": [
{
"name": "site_id",
"type": "int"
},
{
"name": "sublocation",
"type": "string"
},
{
"name": "measured_at",
"type": {
"type": "string",
"connect.version": 1,
"connect.name": "io.debezium.time.ZonedTimestamp"
}
},
{
"name": "depth",
"type": "float"
},
{
"name": "value",
"type": [
"null",
"float"
]
}
],
"connect.name": "demo.public.demo1.Value"
}
]
},
{
"name": "after",
"type": [
"null",
"Value"
]
},
{
"name": "source",
"type": {
"type": "record",
"name": "Source",
"namespace": "io.debezium.connector.postgresql",
"fields": [
{
"name": "version",
"type": "string"
},
{
"name": "connector",
"type": "string"
},
{
"name": "name",
"type": "string"
},
{
"name": "ts_ms",
"type": "long"
},
{
"name": "snapshot",
"type": [
{
"type": "string",
"connect.version": 1,
"connect.parameters": {
"allowed": "true,last,false"
},
"connect.default": "false",
"connect.name": "io.debezium.data.Enum"
},
"null"
],
"default": "false"
},
{
"name": "db",
"type": "string"
},
{
"name": "sequence",
"type": [
"null",
"string"
]
},
{
"name": "schema",
"type": "string"
},
{
"name": "table",
"type": "string"
},
{
"name": "txId",
"type": [
"null",
"long"
]
},
{
"name": "lsn",
"type": [
"null",
"long"
]
},
{
"name": "xmin",
"type": [
"null",
"long"
]
}
],
"connect.name": "io.debezium.connector.postgresql.Source"
}
},
{
"name": "op",
"type": "string"
},
{
"name": "ts_ms",
"type": [
"null",
"long"
]
},
{
"name": "transaction",
"type": [
"null",
{
"type": "record",
"name": "ConnectDefault",
"namespace": "org.apache.pulsar.kafka.shade.io.confluent.connect.avro",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "total_order",
"type": "long"
},
{
"name": "data_collection_order",
"type": "long"
}
]
}
]
}
],
"connect.name": "demo.public.demo1.Envelope"
},
"type": "AVRO",
"timestamp": 0,
"properties": {
"__AVRO_READ_OFFSET__": "5"
}
}
},
"type": "KEY_VALUE",
"timestamp": 1684286797003,
"properties": {
"key.schema.name": "KafkaAvro",
"key.schema.properties": "{\"__AVRO_READ_OFFSET__\":\"5\"}",
"key.schema.type": "AVRO",
"kv.encoding.type": "SEPARATED",
"value.schema.name": "KafkaAvro",
"value.schema.properties": "{\"__AVRO_READ_OFFSET__\":\"5\"}",
"value.schema.type": "AVRO"
}
}
}
Ollie Steiner
05/17/2023, 3:56 PM__AVRO_READ_OFFSET__
in the schema. Here is a outline on my workflow if it helps anyone:
# Consumer subscription etc....
msg = consumer.receive()
key = base64decode(msg.partition_key())
key_schema_dict = { key schema pulled from above }
key_reader = avro.io.DatumReader(avro.schema.make_avsc_object(key_schema_dict))
byte_obj = io.BytesIO(key)
byte_obj.seek(5) # __AVRO_READ_OFFSET__
bin_decode = avro.io.BindaryDecoder(byte_obj)
output = key_reader.read(bin_decode)