Hi! We deployed Datahub on AWS ecosystem with kafk...
# troubleshoot
a
Hi! We deployed Datahub on AWS ecosystem with kafka brokers at MSK Service. We want to consume from topic PlatformEvent_v1 as we want to know when some dataset property has changed so we used the s3 connector from Confluent to leave records on s3. We already connect it successfully but records are not properly sent, with some strange characters as you can see:
Copy code
{
  "_1": "\u0003\u0000*�?Y�&CZ�=9�+�%��݊��a\"entityChangeEvent�\u0005{\"auditStamp\":{\"actor\":\"urn:li:corpuser:datahub\",\"time\":1668075837259},\"entityUrn\":\"urn:li:domain:xxxxxxx\",\"entityType\":\"domain\",\"modifier\":\"urn:li:corpuser:datahub\",\"category\":\"OWNER\",\"operation\":\"ADD\",\"version\":0,\"parameters\":{\"ownerType\":\"TECHNICAL_OWNER\",\"ownerUrn\":\"urn:li:corpuser:datahub\"}} application/json"
}
These are the properties we used with StringConverter as key.converter and value.converter.
Copy code
connector.class=io.confluent.connect.s3.S3SinkConnector
behavior.on.null.values=ignore
s3.region=eu-west-1
flush.size=1
schema.compatibility=NONE
tasks.max=2
topics=PlatformEvent_v1
key.converter.schemas.enable=false
format.class=io.confluent.connect.s3.format.json.JsonFormat
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner
value.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
storage.class=io.confluent.connect.s3.storage.S3Storage
s3.bucket.name=xxxxxxxxx
key.converter=org.apache.kafka.connect.json.JsonConverter
Then we tried changing to JsonConverter but it throws the following error:
Copy code
[Worker-073fad87bf643ddc8] Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'entityChangeEvent': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
How can we consume the records properly? Thank you!
I've just seen the actions framework and that is the use case we want: notify every time a dataset is changed but don't know if we can set an action to leave the records in s3...our final action is to setup a table which saves all this events in Athena
Anyone knows how is implemented the kafka consumer in datahub actions?
on the format: when you check raw messages on the topic do they look like the persisted one or the "corruption" happens during sink process?
a
like the persisted one
i found datahub actions consume it with avrodeserializer