Vincent Laurenzano
09/26/2023, 2:07 PMb'\x01\x01\x01\x88\xe5{"before":{"id":"redacted"}, "after":{"id":"redacted"}}'
So far, the only thing that has worked for me has been to set the source table to read the message raw, and then use a custom function to strip out those leading bytes at query time:
#custom sql function for stripping out the leading functions
@udf(result_type=DataTypes.STRING())
def strip_schema(byte_array):
stripped = byte_array[5:]
decoded = stripped.decode('utf-8')
return decoded
# src ddl for the table api that reads the record in raw format
src_ddl = "
CREATE TABLE users (
record BYTES
) WITH (
'connector' = 'kafka',
'value.format' = 'raw'
)"
# a query that uses the custom function to read
sql = """
with records as (select strip_schema(record) as r from users limit 5)
select JSON_VALUE(r, '$.before.id') as id from records
"""
My question is does anyone have recommendations for how I might best handle these messages?
One of my thoughts is perhaps there's a way to register a custom deserializer for the datastream api?
Another is perhaps it makes more sense to use the "ProcessFunction" for these custom messages?
I realize there is a confluent-avro library but it doesn't seem to support json.rmoff
09/26/2023, 2:17 PMVincent Laurenzano
09/26/2023, 2:20 PMrmoff
09/26/2023, 2:20 PM