hey all, I'm trying to handle some confluent cdc j...
# troubleshooting
v
hey all, I'm trying to handle some confluent cdc json messages in pyflink and wondering what some recommended methods would be. Confluent attaches a 5 byte schema identifier to the message, making standard deserialization impossible. For instance, a message looks something like this:
Copy code
b'\x01\x01\x01\x88\xe5{"before":{"id":"redacted"}, "after":{"id":"redacted"}}'
So far, the only thing that has worked for me has been to set the source table to read the message raw, and then use a custom function to strip out those leading bytes at query time:
Copy code
#custom sql function for stripping out the leading functions
@udf(result_type=DataTypes.STRING())
def strip_schema(byte_array):
    stripped = byte_array[5:]
    decoded = stripped.decode('utf-8')
    return decoded

# src ddl for the table api that reads the record in raw format
src_ddl = "
        CREATE TABLE users (
            record BYTES
        ) WITH (
            'connector' = 'kafka',
            'value.format' = 'raw'
        )"

# a query that uses the custom function to read 
    sql = """
        with records as (select strip_schema(record) as r from users limit 5)
        select JSON_VALUE(r, '$.before.id') as id from records
    """
My question is does anyone have recommendations for how I might best handle these messages? One of my thoughts is perhaps there's a way to register a custom deserializer for the datastream api? Another is perhaps it makes more sense to use the "ProcessFunction" for these custom messages? I realize there is a confluent-avro library but it doesn't seem to support json.
r
do you have the option of writing the data to the topic in plain JSON, not JSON Schema (which is why you're getting the magic bytes on the front of each message)?
v
not simply, I'd prefer to resolve read time since these are production topics and already consumed elsewhere (although it could be an option for new topics)
r
that's fair enough