George Leonard
03/12/2025, 7:25 PMJohn MacKinnon
03/14/2025, 2:28 AMArray
deserialization support for flink-cdc sources today (link). are there any specific reasons that this isn’t supported, or is it just lack of engineering bandwidth to add support? I hacked together a quick proof-of-concept converter, that constructs a GenericArrayData
from the elements in a given input array. It only works on text-array inputs right now, but i feel a solution similar to the existing Row-converter should cover all inputs. Are there any nuances that I’m missing?George Leonard
03/15/2025, 11:28 AMGeorge Leonard
03/19/2025, 4:25 PMGeorge Leonard
03/23/2025, 4:38 PMSergei Morozov
04/02/2025, 4:47 PMBrad Murry
04/02/2025, 8:27 PMGeorge Leonard
04/04/2025, 10:10 AM{
"name": "factory_iot_value",
"doc": "Factory Telemetry/IoT measurements",
"namespace": "factory_iot.avro",
"type": "record",
"fields": [
{
"name": "ts",
"type": "long",
"doc": "UTC Timestamp",
"logicalType": "timestamp-millis"
},
{
"name": "metadata",
"type": [
{
"type": "record",
"name": "metadata",
"fields": [
{
"name": "siteId",
"type": "int",
"doc": "Factory/Site ID"
},
{
"name": "deviceId",
"type": "int",
"doc": "Device ID"
},
{
"name": "sensorId",
"type": "int",
"doc": "Sensor on Device ID"
},
{
"name": "unit",
"type": "string",
"doc": "Measurement units of sensor"
},
{
"name": "ts_human",
"type": [
"null",
"string"
],
"default": null,
"doc": "Human readable Timestamp"
},
{
"name": "location",
"type": [
"null",
{
"name": "location",
"type": "record",
"fields": [
{
"name": "latitude",
"type": "double"
},
{
"name": "longitude",
"type": "double"
}
]
}
],
"default": null,
"doc": "GPS Coords of Factory/Site"
},
{
"name": "deviceType",
"type": [
"null",
"string"
],
"default": null,
"doc": "Device Description"
}
]
}
]
},
{
"name": "measurement",
"type": "double",
"doc": "Measurement retried at UTC timestamp for Sensor on Device located at Site"
},
{
"name": "op",
"type": "string",
"default": "c"
}
]
}
payload posted onto kafka topic.
{
"timestamp": "2024-10-02T00:00:00.869Z",
"metadata": {
"siteId": 1009,
"deviceId": 1042,
"sensorId": 10180,
"unit": "Psi",
"ts_human": "2024-10-02T00:00:00.869Z",
"location": {
"latitude": -26.195246,
"longitude": 28.034088
},
"deviceType": "Oil Pump"
},
"measurement": 1013.3997
}
Error
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Not a valid schema field: after
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:108)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270)
at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/javax.security.auth.Subject.doAs(Unknown Source)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335)
Caused by: org.apache.avro.AvroRuntimeException: Not a valid schema field: after
at org.apache.avro.generic.GenericData$Record.get(GenericData.java:282)
at org.apache.paimon.flink.action.cdc.format.debezium.DebeziumAvroRecordParser.getAndCheck(DebeziumAvroRecordParser.java:205)
at org.apache.paimon.flink.action.cdc.format.debezium.DebeziumAvroRecordParser.extractRecords(DebeziumAvroRecordParser.java:94)
at org.apache.paimon.flink.action.cdc.format.AbstractRecordParser.buildSchema(AbstractRecordParser.java:73)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(Unknown Source)
at java.base/java.util.ArrayList$ArrayListSpliterator.tryAdvance(Unknown Source)
at java.base/java.util.stream.ReferencePipeline.forEachWithCancel(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source)
at java.base/java.util.stream.FindOps$FindOp.evaluateSequential(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
at java.base/java.util.stream.ReferencePipeline.findFirst(Unknown Source)
at org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils.getSchema(MessageQueueSchemaUtils.java:70)
at org.apache.paimon.flink.action.cdc.MessageQueueSyncTableActionBase.retrieveSchema(MessageQueueSyncTableActionBase.java:67)
at org.apache.paimon.flink.action.cdc.SyncTableActionBase.beforeBuildingSourceSink(SyncTableActionBase.java:150)
at org.apache.paimon.flink.action.cdc.SynchronizationActionBase.build(SynchronizationActionBase.java:117)
at org.apache.paimon.flink.action.cdc.SynchronizationActionBase.run(SynchronizationActionBase.java:215)
at org.apache.paimon.flink.action.FlinkActions.main(FlinkActions.java:41)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
... 12 more
flink@01ab4a472184:~/lib/flink$
Thomas Peyric
04/11/2025, 12:26 PMsnapshot.select.statement.overrides
for using custom SQL query on some tables.
when I create my PostgresSourceBuilder.PostgresIncrementalSource.builder() then I add all needed parameter into debeziumProperties
but my custom query are never called ... even if I set .startupOptions(StartupOptions.initial())
or debezium property "snapshot.mode=initial"
when inspecting source code of postgres cdc connector I see this :
// file: PostgresSourceConfigFactory.java
// release: 3.3.0
// lines: 103..105
// url: <https://github.com/apache/flink-cdc/blob/release-3.3.0/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java#L103C1-L105C53>
...
...
// override the user-defined debezium properties
if (dbzProperties != null) {
props.putAll(dbzProperties);
}
// The PostgresSource will do snapshot according to its StartupMode. <--
// Do not need debezium to do the snapshot work. <--
props.setProperty("snapshot.mode", "never"); <-- HERE
Configuration dbzConfiguration = Configuration.from(props);
...
...
it seems that snapshot.mode
is forced to never
and debezium snapshot feature is never used
so it seems that snapshot.select.statement.overrides
will never be used for custom SQL query
Is there a way to do custom SQL Query using postgres cdc connector ?
thxSohan Honavar
04/15/2025, 11:10 AMSergei Morozov
04/15/2025, 6:52 PMSergei Morozov
04/17/2025, 10:58 PMSchemaOperator
class and other objects in the org.apache.flink.cdc.runtime.operators.schema
package.
1. The source doesn't have to contain table schemas as part of its state. Its only job is to track its position in the source and emit all events downstream. This is effectively the same requirement as for regular Flink sources.
2. The schema operator is stateful. This state includes table schemas. Specifically, SchemaRegistry#checkpointCoordinator()
serializes SchemaManager#tableSchemas
. This makes sense, since the logic of the operator depends on the schemas, and after a job is restarted, the schema operator should be in the same state as before.
3. The sink doesn't have to contain table schemas as part of its state. Before emitting the first event for each table, DataSinkWriterOperator#emitLatestSchema()
will obtain the table schema from the schema operator and emit a CreateTableEvent
, from which the sink can initialize the table schema.
It looks like my understanding of 1 and 2 was wrong. When the schema operator restarts, even though the table schemas are available in the state, it doesn't use them. Instead, it expects that the source will re-emit `CreateTableEvent`s for all tables (see MySqlPipelineRecordEmitter#showCreateTable()
, MySqlPipelineRecordEmitter#processElement()
for example).
That raises the following questions:
1. Does it mean that upon restart, every source must be able to re-emit the "create table" events that represent the schema corresponding to the checkpoint it's restored from?
2. Where is should the source store the schema?
3. Why does the operator not use the schemas from its own checkpointed state?
4. If the source is responsible for emitting `CreateTableEvent`s, why does the DataSinkWriterOperator
also have to do that?
Another observation is that, the MySQL source doesn't fulfill this contract in full. It does emit the events with the schemas but they are unrelated to the checkpoint. They are the current schemas in the external source (see SHOW CREATE TABLE
), so the state of the job after the restart may differ from the state before. Depending on how far behind the source the job is, this difference may be more or less significant and lead to correctness issues.Ravi raj
05/10/2025, 7:31 PM.startupOptions(StartupOptions.latest())
, I can see the splits are not happening and all the events are going into 1 subtask event with .setParallelism(4)
Any idea how to scale this behaviour?Brad Murry
05/12/2025, 8:55 PMSergei Morozov
05/13/2025, 8:00 PMMathieu Jambou
05/15/2025, 10:12 PMdz902
05/20/2025, 8:04 AMTiansu Yu
06/05/2025, 8:13 AMjava.lang.NullPointerException
at org.apache.flink.cdc.debezium.table.RowDataDebeziumDeserializeSchema.extractAfterRow(RowDataDebeziumDeserializeSchema.java:151)
at org.apache.flink.cdc.debezium.table.RowDataDebeziumDeserializeSchema.deserialize(RowDataDebeziumDeserializeSchema.java:143)
at org.apache.flink.cdc.debezium.internal.DebeziumChangeFetcher.handleBatch(DebeziumChangeFetcher.java:245)
at org.apache.flink.cdc.debezium.internal.DebeziumChangeFetcher.runFetchLoop(DebeziumChangeFetcher.java:167)
at org.apache.flink.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:447)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:113)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:71)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:338)
(If I look at the publication on my postgres table i can see there is no entries yet in the publication, since 24h has not passed since i set this up)
I wonder if postgres cdc connector should handle replication publication with no entry gracefully (proper null handling), and only take the initial snapshotAndré Anastácio
06/18/2025, 5:47 PMBentzi Mor
06/20/2025, 2:37 PMdontu balu
06/27/2025, 1:15 AMdontu balu
06/28/2025, 1:12 AMdontu balu
07/01/2025, 4:57 PMSergei Morozov
07/08/2025, 12:13 AMGeorge Leonard
07/08/2025, 3:45 PMGeorge Leonard
07/08/2025, 3:46 PMLC
07/09/2025, 1:39 AMop_ts
for mySQL?
• op_ts
captures mySQL binlog header timestamp which is only in seconds precision
• When we generate cdc changelog via append, there can be entries with the identical op_ts
seconds, making it impossible to dedup w/o other meta fieldsdontu balu
07/09/2025, 5:20 AMLevani Kokhreidze
07/09/2025, 2:12 PMfirst_table
). As advertised, that table went through the snapshotting phase and started reading from the binlog afterwards.
Since then, we have added a new table(let’s call it second_table
) to replicate. This new table finished the snapshotting, but cdc stream never started for the new table. We took a savepoint and recovered from the savepoint when releasing a new table for replication.
We have the following options enabled:
.scanNewlyAddedTableEnabled(true).assignUnboundedChunkFirst(true).skipSnapshotBackfill(true)
What’s weird is that the new table, while it was going through snapshotting phase, in the logs i could still see:
Binlog offset for tables [db.first_table, db.second_table] on checkpoint 17677: {transaction_id=null, ts_sec=1751552114, file=mysql-bin-changelog.058255, pos=27664541, kind=SPECIFIC, row=1, event=0, server_id=304919814}
After the snapshot finished for the old table, i can see the following in the logs:
Handling split change SplitAddition:[[MySqlBinlogSplit{splitId='binlog-split', tables=[db.first_table, db.second_table], offset={transaction_id=null, ts_sec=1751551987, file=mysql-bin-changelog.058252, pos=31325777, kind=SPECIFIC, row=1, event=0, server_id=304919814}, endOffset={ts_sec=0, file=, pos=-9223372036854775808, kind=NON_STOPPING, row=0, event=0}, isSuspended=false}]]
But SplitAddition is for the old mysql binlog file compared to what’s logged during the checkpoint. Any pointers how can I debug this?dontu balu
07/11/2025, 11:14 PMcolumn1 > 0 and colum2 is not empty
etc. before writing to destination table? This is different then filter as we want to check such cases before writing the data to check for basic data validity.
2. Also is there a support to say that MYSQL CDC Delete operations should not be applied on to the destination? Like if a row is deleted, I don't want that change to be applied to destination.
3. Would it be possible to say in a single Flink CDC Pipeline YAML definition for MYSQL source, apply transformation1 and write to destination1. And apply transformation 2 and write to destination2?
cc @rmoff @Sergei Morozov @Leonard Xu @yux