https://flink.apache.org/ logo
Join Slack
Powered by
# flink-cdc
  • g

    George Leonard

    03/12/2025, 7:25 PM
    anyone...
  • j

    John MacKinnon

    03/14/2025, 2:28 AM
    Hi folks - I ran into the lack of
    Array
    deserialization support for flink-cdc sources today (link). are there any specific reasons that this isn’t supported, or is it just lack of engineering bandwidth to add support? I hacked together a quick proof-of-concept converter, that constructs a
    GenericArrayData
    from the elements in a given input array. It only works on text-array inputs right now, but i feel a solution similar to the existing Row-converter should cover all inputs. Are there any nuances that I’m missing?
  • g

    George Leonard

    03/15/2025, 11:28 AM
    guys... to sink a json payload from kafka to paimon. I can imagine a schema on kafka be beneficial, as that immediately tell paimon what schema it will need to be able to take the data. if i did not have a schema... and i simply json.dumps the payload... which basically just stringified it... would paimon still extract a schema... know it's not ideal, but still stuck with above and looking at how to make some progress with this.
  • g

    George Leonard

    03/19/2025, 4:25 PM
    making some progress, need some advise with the following. I'm using confluent - avro to serialisation messages during produce step onto Confluent Kafka cluster/topic, I want to use the Flink Action Framework/CDC engine to consume them and insert into Paimon table. what value converter/library am I to use. Flink stack is 0.90 atm... if "critically required" can look at upgrading stack to Paimon 1.0
  • g

    George Leonard

    03/23/2025, 4:38 PM
    problem solved... I defined my avro schema with "ts" field that was to store the timestamp as a int, a int is not big enough, needs to be a long. discovered this by using kcat to try and consume the messages and got a proper error that hinted to the problem.
    l
    p
    • 3
    • 5
  • s

    Sergei Morozov

    04/02/2025, 4:47 PM
    @Leonard Xu I'd love to get your input on apache/flink-cdc#3646. Thanks!
  • b

    Brad Murry

    04/02/2025, 8:27 PM
    Anyone here running Flink CDC and pushing change log events to AWS Kinesis? Any gotchas you'd call out ?
    j
    • 2
    • 2
  • g

    George Leonard

    04/04/2025, 10:10 AM
    ok, so it seems we have all the libraries in place. below is the schema registered on my Kafka topic, followed by the payload being posted and then the Debezium CDC error... with the "after" tag problem. Avro Schema
    Copy code
    {
      "name": "factory_iot_value",
      "doc": "Factory Telemetry/IoT measurements",
      "namespace": "factory_iot.avro",
      "type": "record",
      "fields": [
        {
          "name": "ts",
          "type": "long",
          "doc": "UTC Timestamp",
          "logicalType": "timestamp-millis"
        },
        {
          "name": "metadata",
          "type": [
            {
              "type": "record",
              "name": "metadata",
              "fields": [
                {
                  "name": "siteId",
                  "type": "int",
                  "doc": "Factory/Site ID"
                },
                {
                  "name": "deviceId",
                  "type": "int",
                  "doc": "Device ID"
                },
                {
                  "name": "sensorId",
                  "type": "int",
                  "doc": "Sensor on Device ID"
                },
                {
                  "name": "unit",
                  "type": "string",
                  "doc": "Measurement units of sensor"
                },
                {
                  "name": "ts_human",
                  "type": [
                    "null",
                    "string"
                  ],
                  "default": null,
                  "doc": "Human readable Timestamp"
                },
                {
                  "name": "location",
                  "type": [
                    "null",
                    {
                      "name": "location",
                      "type": "record",
                      "fields": [
                        {
                          "name": "latitude",
                          "type": "double"
                        },
                        {
                          "name": "longitude",
                          "type": "double"
                        }
                      ]
                    }
                  ],
                  "default": null,
                  "doc": "GPS Coords of Factory/Site"
                },
                {
                  "name": "deviceType",
                  "type": [
                    "null",
                    "string"
                  ],
                  "default": null,
                  "doc": "Device Description"
                }
              ]
            }
          ]
        },
        {
          "name": "measurement",
          "type": "double",
          "doc": "Measurement retried at UTC timestamp for Sensor on Device located at Site"
        },
        {
          "name": "op",
          "type": "string",
          "default": "c"
        }
      ]
    }
    payload posted onto kafka topic.
    Copy code
    {
      "timestamp": "2024-10-02T00:00:00.869Z",
      "metadata": {
        "siteId": 1009,
        "deviceId": 1042,
        "sensorId": 10180,
        "unit": "Psi",
        "ts_human": "2024-10-02T00:00:00.869Z",
        "location": {
          "latitude": -26.195246,
          "longitude": 28.034088
        },
        "deviceType": "Oil Pump"
      },
      "measurement": 1013.3997
    }
    Error
    Copy code
    org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Not a valid schema field: after
    	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
    	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
    	at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:108)
    	at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026)
    	at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
    	at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270)
    	at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367)
    	at java.base/java.security.AccessController.doPrivileged(Native Method)
    	at java.base/javax.security.auth.Subject.doAs(Unknown Source)
    	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
    	at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    	at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367)
    	at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335)
    Caused by: org.apache.avro.AvroRuntimeException: Not a valid schema field: after
    	at org.apache.avro.generic.GenericData$Record.get(GenericData.java:282)
    	at org.apache.paimon.flink.action.cdc.format.debezium.DebeziumAvroRecordParser.getAndCheck(DebeziumAvroRecordParser.java:205)
    	at org.apache.paimon.flink.action.cdc.format.debezium.DebeziumAvroRecordParser.extractRecords(DebeziumAvroRecordParser.java:94)
    	at org.apache.paimon.flink.action.cdc.format.AbstractRecordParser.buildSchema(AbstractRecordParser.java:73)
    	at java.base/java.util.stream.ReferencePipeline$3$1.accept(Unknown Source)
    	at java.base/java.util.ArrayList$ArrayListSpliterator.tryAdvance(Unknown Source)
    	at java.base/java.util.stream.ReferencePipeline.forEachWithCancel(Unknown Source)
    	at java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(Unknown Source)
    	at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
    	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source)
    	at java.base/java.util.stream.FindOps$FindOp.evaluateSequential(Unknown Source)
    	at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
    	at java.base/java.util.stream.ReferencePipeline.findFirst(Unknown Source)
    	at org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils.getSchema(MessageQueueSchemaUtils.java:70)
    	at org.apache.paimon.flink.action.cdc.MessageQueueSyncTableActionBase.retrieveSchema(MessageQueueSyncTableActionBase.java:67)
    	at org.apache.paimon.flink.action.cdc.SyncTableActionBase.beforeBuildingSourceSink(SyncTableActionBase.java:150)
    	at org.apache.paimon.flink.action.cdc.SynchronizationActionBase.build(SynchronizationActionBase.java:117)
    	at org.apache.paimon.flink.action.cdc.SynchronizationActionBase.run(SynchronizationActionBase.java:215)
    	at org.apache.paimon.flink.action.FlinkActions.main(FlinkActions.java:41)
    	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    	at java.base/java.lang.reflect.Method.invoke(Unknown Source)
    	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
    	... 12 more
    flink@01ab4a472184:~/lib/flink$
  • t

    Thomas Peyric

    04/11/2025, 12:26 PM
    Hi all ;-) I am using flink cdc postgresql connector and I want to use debezium property
    snapshot.select.statement.overrides
    for using custom SQL query on some tables. when I create my PostgresSourceBuilder.PostgresIncrementalSource.builder() then I add all needed parameter into debeziumProperties but my custom query are never called ... even if I set
    .startupOptions(StartupOptions.initial())
    or debezium property
    "snapshot.mode=initial"
    when inspecting source code of postgres cdc connector I see this :
    Copy code
    // file: PostgresSourceConfigFactory.java
    // release: 3.3.0
    // lines: 103..105
    // url: <https://github.com/apache/flink-cdc/blob/release-3.3.0/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java#L103C1-L105C53>
    
    ...
    ...
     // override the user-defined debezium properties
    if (dbzProperties != null) {
        props.putAll(dbzProperties);
    }
    
    // The PostgresSource will do snapshot according to its StartupMode.      <--
    // Do not need debezium to do the snapshot work.                          <--
    props.setProperty("snapshot.mode", "never");                              <--  HERE
    
    Configuration dbzConfiguration = Configuration.from(props);
    ...
    ...
    it seems that
    snapshot.mode
    is forced to
    never
    and debezium snapshot feature is never used so it seems that
    snapshot.select.statement.overrides
    will never be used for custom SQL query Is there a way to do custom SQL Query using postgres cdc connector ? thx
    s
    • 2
    • 3
  • s

    Sohan Honavar

    04/15/2025, 11:10 AM
    New to Flink CDC was going through the documentation i am confused between Pipeline Connectors & CDC source connectors How are they related to each other or are different from each other.
    r
    • 2
    • 2
  • s

    Sergei Morozov

    04/15/2025, 6:52 PM
    @yux, could you take another look at apache/flink-cdc#3977? I updated it as you requested. /cc @Leonard Xu
    👌 1
    👌🏼 1
  • s

    Sergei Morozov

    04/17/2025, 10:58 PM
    Hi there, I'm building a custom pipeline source connector and would like to have a better understanding of the contract that a source must fulfill to work correctly as part of a Flink CDC pipeline. Earlier, after analyzing the Flink CDC 3.2.0 codebase, I arrived at the following understanding of the contract: Note, by "schema operator" below I mean all objects the
    SchemaOperator
    class and other objects in the
    org.apache.flink.cdc.runtime.operators.schema
    package. 1. The source doesn't have to contain table schemas as part of its state. Its only job is to track its position in the source and emit all events downstream. This is effectively the same requirement as for regular Flink sources. 2. The schema operator is stateful. This state includes table schemas. Specifically,
    SchemaRegistry#checkpointCoordinator()
    serializes
    SchemaManager#tableSchemas
    . This makes sense, since the logic of the operator depends on the schemas, and after a job is restarted, the schema operator should be in the same state as before. 3. The sink doesn't have to contain table schemas as part of its state. Before emitting the first event for each table,
    DataSinkWriterOperator#emitLatestSchema()
    will obtain the table schema from the schema operator and emit a
    CreateTableEvent
    , from which the sink can initialize the table schema. It looks like my understanding of 1 and 2 was wrong. When the schema operator restarts, even though the table schemas are available in the state, it doesn't use them. Instead, it expects that the source will re-emit `CreateTableEvent`s for all tables (see
    MySqlPipelineRecordEmitter#showCreateTable()
    ,
    MySqlPipelineRecordEmitter#processElement()
    for example). That raises the following questions: 1. Does it mean that upon restart, every source must be able to re-emit the "create table" events that represent the schema corresponding to the checkpoint it's restored from? 2. Where is should the source store the schema? 3. Why does the operator not use the schemas from its own checkpointed state? 4. If the source is responsible for emitting `CreateTableEvent`s, why does the
    DataSinkWriterOperator
    also have to do that? Another observation is that, the MySQL source doesn't fulfill this contract in full. It does emit the events with the schemas but they are unrelated to the checkpoint. They are the current schemas in the external source (see
    SHOW CREATE TABLE
    ), so the state of the job after the restart may differ from the state before. Depending on how far behind the source the job is, this difference may be more or less significant and lead to correctness issues.
    s
    l
    y
    • 4
    • 4
  • r

    Ravi raj

    05/10/2025, 7:31 PM
    Hello, I am trying the build the streaming pipeline from MongoDB to kafka, but when using the
    .startupOptions(StartupOptions.latest())
    , I can see the splits are not happening and all the events are going into 1 subtask event with
    .setParallelism(4)
    Any idea how to scale this behaviour?
    s
    p
    • 3
    • 5
  • b

    Brad Murry

    05/12/2025, 8:55 PM
    Anyone using Flink CDC for Snowflake? Hoping there are some custom connectors out in the OSS wild I can look at before going out alone.
  • s

    Sergei Morozov

    05/13/2025, 8:00 PM
    Hi, can I get a review on apache/flink-cdc#4014, please?
    👍 2
    k
    • 2
    • 1
  • m

    Mathieu Jambou

    05/15/2025, 10:12 PM
    Hi using, apache Flink I try to replicate data from my source db using CDC to my target db and for some tables I do some manipulation. on my initial load, my task heap always reach 3Gb/3gb ( I have 20 tables with 500k records by table) wondering If I have the correct approach
    k
    • 2
    • 1
  • d

    dz902

    05/20/2025, 8:04 AM
    Hi, Oracle resume from SCN is not supported? I don't see an option even in latest version. LogMiner has the capability to start from a given SCN, so I believe this should be made an option. Or maybe I'm missing something? If Oracle CDC is interrupted the only way is to start over with a snapshot seems unreasonable. Thanks.
  • t

    Tiansu Yu

    06/05/2025, 8:13 AM
    Hi team. I have one slowly changing dimension table that only receives one update per day, where i want to enrich another event table. While consuming this table via flink postgres cdc, it will complain of the following:
    Copy code
    java.lang.NullPointerException
    	at org.apache.flink.cdc.debezium.table.RowDataDebeziumDeserializeSchema.extractAfterRow(RowDataDebeziumDeserializeSchema.java:151)
    	at org.apache.flink.cdc.debezium.table.RowDataDebeziumDeserializeSchema.deserialize(RowDataDebeziumDeserializeSchema.java:143)
    	at org.apache.flink.cdc.debezium.internal.DebeziumChangeFetcher.handleBatch(DebeziumChangeFetcher.java:245)
    	at org.apache.flink.cdc.debezium.internal.DebeziumChangeFetcher.runFetchLoop(DebeziumChangeFetcher.java:167)
    	at org.apache.flink.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:447)
    	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:113)
    	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:71)
    	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:338)
    (If I look at the publication on my postgres table i can see there is no entries yet in the publication, since 24h has not passed since i set this up) I wonder if postgres cdc connector should handle replication publication with no entry gracefully (proper null handling), and only take the initial snapshot
  • a

    André Anastácio

    06/18/2025, 5:47 PM
    Hello everyone, I want to use Flink CDC with PostgreSQL 17, but the documentation says that support is only up to PG 14. I've seen some messages from people here saying they're using it with PG 16. So to be able to work with more recent versions, do I need to make some changes, or would it work out of the box?
  • b

    Bentzi Mor

    06/20/2025, 2:37 PM
    Hi team I tried to create flink sql cdc from MySQL to Iceberg using hive metastore but got errors . Can someone share required jars and example for sql file? Thanks
  • d

    dontu balu

    06/27/2025, 1:15 AM
    Hi team, I am working on Flink CDC solution from MYSQL to Iceberg. The user input is a database and table.The user will not specify any schema. Since, I do not know the schema of MYSQL table. Is it possible that the Flink CDC automatically infers the schema when using Flink SQL? Or is it always mandatory to specify the schema when creating source table in Flink SQL?
    s
    r
    • 3
    • 9
  • d

    dontu balu

    06/28/2025, 1:12 AM
    Hi Team, Can someone help confirm in Flink CDC Pipeline connectors (YAML based) support any other support apart from MYSQL? I want to use Postgress source as well for my use case? Looking here, I only see mYSQL source supported - https://nightlies.apache.org/flink/flink-cdc-docs-release-3.4/docs/connectors/pipeline-connectors/overview/.
  • d

    dontu balu

    07/01/2025, 4:57 PM
    Hi team, any one from the community who can help with above request? And if not supported, what will the current schedule for release will be?
    r
    s
    l
    • 4
    • 8
  • s

    Sergei Morozov

    07/08/2025, 12:13 AM
    Hello, can I get a review on apache/flink-cdc#4048, please?
    👍 1
  • g

    George Leonard

    07/08/2025, 3:45 PM
    question, with connector=jdbc... does it only show records inserted after the start of the select, or do i neeeeed to use CDC (dont really want to...)
  • g

    George Leonard

    07/08/2025, 3:46 PM
    i want my select to be able to return all records from the source table.... it's to be used on a join... but then also think a cdc will allow me to discover any new records also... and push the base set + the growth via a insert into X select * from Y;
    l
    • 2
    • 2
  • l

    LC

    07/09/2025, 1:39 AM
    Hello, is anyone else facing issues with table API's
    op_ts
    for mySQL? •
    op_ts
    captures mySQL binlog header timestamp which is only in seconds precision • When we generate cdc changelog via append, there can be entries with the identical
    op_ts
    seconds, making it impossible to dedup w/o other meta fields
    l
    • 2
    • 4
  • d

    dontu balu

    07/09/2025, 5:20 AM
    We want to use Flink CDC Pipeline connector from MYSQL to Iceberg. What sort of monitoring (in Grafana) can we have for this specific application apart from Flink application monitoring? Any thoughts how everyone does it Production? cc @Leonard Xu
    l
    • 2
    • 2
  • l

    Levani Kokhreidze

    07/09/2025, 2:12 PM
    Hi team, wondering if you could give me some pointers. We have a Flink mysql connector that was reading data from the specific table (let’s call it
    first_table
    ). As advertised, that table went through the snapshotting phase and started reading from the binlog afterwards. Since then, we have added a new table(let’s call it
    second_table
    ) to replicate. This new table finished the snapshotting, but cdc stream never started for the new table. We took a savepoint and recovered from the savepoint when releasing a new table for replication. We have the following options enabled:
    Copy code
    .scanNewlyAddedTableEnabled(true).assignUnboundedChunkFirst(true).skipSnapshotBackfill(true)
    What’s weird is that the new table, while it was going through snapshotting phase, in the logs i could still see:
    Copy code
    Binlog offset for tables [db.first_table, db.second_table] on checkpoint 17677: {transaction_id=null, ts_sec=1751552114, file=mysql-bin-changelog.058255, pos=27664541, kind=SPECIFIC, row=1, event=0, server_id=304919814}
    After the snapshot finished for the old table, i can see the following in the logs:
    Copy code
    Handling split change SplitAddition:[[MySqlBinlogSplit{splitId='binlog-split', tables=[db.first_table, db.second_table], offset={transaction_id=null, ts_sec=1751551987, file=mysql-bin-changelog.058252, pos=31325777, kind=SPECIFIC, row=1, event=0, server_id=304919814}, endOffset={ts_sec=0, file=, pos=-9223372036854775808, kind=NON_STOPPING, row=0, event=0}, isSuspended=false}]]
    But SplitAddition is for the old mysql binlog file compared to what’s logged during the checkpoint. Any pointers how can I debug this?
    l
    • 2
    • 2
  • d

    dontu balu

    07/11/2025, 11:14 PM
    Hi team, I have 3 questions on Flink CDC Pipeline 1. Do we have support for validation rules in Flink CDC Pipelines which can say for example like
    column1 > 0 and colum2 is not empty
    etc. before writing to destination table? This is different then filter as we want to check such cases before writing the data to check for basic data validity. 2. Also is there a support to say that MYSQL CDC Delete operations should not be applied on to the destination? Like if a row is deleted, I don't want that change to be applied to destination. 3. Would it be possible to say in a single Flink CDC Pipeline YAML definition for MYSQL source, apply transformation1 and write to destination1. And apply transformation 2 and write to destination2? cc @rmoff @Sergei Morozov @Leonard Xu @yux
    r
    l
    • 3
    • 4