Hi, How to create CDC task with completed lineage ...
# troubleshoot
m
Hi, How to create CDC task with completed lineage (include source, sink side) like this demo https://demo.datahubproject.io/dataset/urn:li:dataset:(urn:li:dataPlatform:kafka,cdc.UserAccount_ChangeEvent,PROD)?is_lineage_mode=true I try to create ingest type “kafka metadata, kafka connect” but lineage display only “source connector” side (first image) but “sink connector” side display only pipeline task (second image)
m
@millions-soccer-98440: the intermediate hop through the kafka-connect job is expected when you use kafka-connect ingestion. I'm curious how you got the direct edge from the mysql table to the kafka topic in your setup. are you emitting a lineage edge manually?
m
Hi @mammoth-bear-12532 I create 2 connector type (2 hop : 1. soruce -> kafka topic , 2. kafka top -> sink) “source connector” for cdc from mysql to kakfa topic use “connector.class: io.debezium.connector.mysql.MySqlConnector”
Copy code
# <https://debezium.io/documentation/reference/1.3/connectors/mysql.html#mysql-property-time-precision-mode>

apiVersion: <http://kafka.strimzi.io/v1alpha1|kafka.strimzi.io/v1alpha1>
kind: KafkaConnector
metadata:
  name: thestreet-db
  namespace: kafka
  labels:
    <http://strimzi.io/cluster|strimzi.io/cluster>: tspace-kafka-connect
  annotations:
    <http://strimzi.io/use-connector-resources|strimzi.io/use-connector-resources>: "true"
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  tasksMax: 1
  config:
    name: thestreet-db
    connector.class: io.debezium.connector.mysql.MySqlConnector
    tasks.max: 1
    database.server.name: thestreet
    database.hostname: 127.0.0.1
    database.port: 3306
    database.user: user
    database.password: password
    database.history.kafka.bootstrap.servers: tspace-kafka-cluster-kafka-bootstrap.kafka:9092
    database.history.kafka.topic: history.thestreet
    database.history.store.only.monitored.tables.ddl: true
    value.converter: io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url: <http://cp-schema-registry.kafka-eco-system:8081>
    time.precision.mode: connect
    snapshot.mode: "when_needed"
  type: source
“sink connector” from kafka topic to any RDBMS use “connector.class: io.confluent.connect.jdbc.JdbcSinkConnector”
Copy code
apiVersion: <http://kafka.strimzi.io/v1alpha1|kafka.strimzi.io/v1alpha1>
kind: KafkaConnector
metadata:
  name: thestreet-users
  namespace: kafka
  <http://strimzi.io/cluster|strimzi.io/cluster>: tspace-kafka-connect
  labels:
    <http://strimzi.io/cluster|strimzi.io/cluster>: tspace-kafka-connect
  annotations:
    <http://strimzi.io/use-connector-resources|strimzi.io/use-connector-resources>: "true"
spec:
  class: io.confluent.connect.jdbc.JdbcSinkConnector
  tasksMax: 1
  config:
    name: thestreet-users
    connector.class: io.confluent.connect.jdbc.JdbcSinkConnector
    topics: thestreet.thestreet.users
    tasks.max: 1
    connection.url: jdbc:<postgresql://127.0.0.1:5432/thestreet>
    connection.user: user
    connection.password: password
    auto.create: true
    auto.evolve: true
    insert.mode: upsert 
    pk.fields: id
    pk.mode: record_key
    errors.tolerance: all
    value.converter: io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url: <http://cp-schema-registry.kafka-eco-system:8081>
    <http://errors.retry.delay.max.ms|errors.retry.delay.max.ms>: 900000
    errors.retry.timeout: 10800000

    # transfrom
    transforms: unwrap
    transforms.unwrap.type: io.debezium.transforms.ExtractNewRecordState
    transforms.unwrap.drop.tombstones: false
    transforms.unwrap.delete.handling.mode: rewrite
  type: sink
m
Got it. And how did you get the direct edge between the MySQL table and the Kafka topic
In the screen shot you posted
This edge. @millions-soccer-98440
m
Hi @mammoth-bear-12532 this edge from kafka-connect ingestion
m
oh I didn't realize it produces both Dataset -> Job -> Dataset and Dataset -> Dataset edges. @square-greece-86505: is that right?
s
Yes correct @mammoth-bear-12532 it produces both
m
I see... I think we moved away from that model in other connectors
and make the edges always going thru the "pipeline"