George Leonard
09/08/2024, 12:34 PMCREATE TABLE c_hive.db01.t_f_msqlcdc_salespayments (
`invoiceNumber` STRING,
`payDateTime_Ltz` STRING,
`payTimestamp_Epoc` STRING,
`paid` DOUBLE,
`finTransactionId` STRING,
`created_at` TIMESTAMP,
PRIMARY KEY(`invoiceNumber`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'mysqlcdc',
'port' = '3306',
'username' = 'flinkcdc',
'password' = 'flinkpw',
'database-name' = 'sales',
'table-name' = 'salespayments',
'scan.startup.mode' = 'earliest-offset', -- <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/mysql-cdc/#startup-reading-position>
'server-id' = '184054'
);
I can select from above and see records arrive.
CREATE OR REPLACE TABLE c_hive.db01.t_f_avro_salespayments (
`invoiceNumber` STRING,
`payDateTime_Ltz` STRING,
`payTimestamp_Epoc` STRING,
`paid` DOUBLE,
`finTransactionId` STRING,
`created_at` TIMESTAMP(3),
`payTimestamp_WM` AS TO_TIMESTAMP(FROM_UNIXTIME(CAST(`payTimestamp_Epoc` AS BIGINT) / 1000)),
WATERMARK FOR `payTimestamp_WM` AS `payTimestamp_WM`,
PRIMARY KEY (`invoiceNumber`) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka'
,'topic' = 'avro_salespayments'
,'properties.bootstrap.servers' = 'broker:29092'
,'value.fields-include' = 'ALL'
,'value.format' = 'avro-confluent'
,'value.avro-confluent.schema-registry.url'= '<http://schema-registry:9081>'
,'key.format' = 'avro-confluent'
,'key.avro-confluent.schema-registry.url' = '<http://schema-registry:9081>'
,'properties.group.id' = 'mysqlcdcsourced'
);
Using below to insert into above.
INSERT INTO c_hive.db01.t_f_avro_salespayments (
`invoiceNumber`,
`payDateTime_Ltz`,
`payTimestamp_Epoc`,
`paid`,
`finTransactionId`,
`created_at`
) SELECT
invoiceNumber,
payDateTime_Ltz,
payTimestamp_Epoc,
paid,
finTransactionId,
created_at
FROM
c_hive.db01.t_f_msqlcdc_salespayments;
At this point I can see the records arrive in Kafka topic via kcat and via Control-Center
If I run:
select * from c_hive.db01.t_f_avro_salespayments;
Results in:
.19.0.19:43253-40fb4d @ devlab-flink-taskmanager-2.devlab (dataPort=43123).
flink-jobmanager | java.lang.NoSuchMethodError: 'void org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.<init>(org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue, java.util.function.Supplier, org.apache.flink.configuration.Configuration, java.util.function.Consumer)'
flink-jobmanager | at org.apache.flink.connector.kafka.source.reader.fetcher.KafkaSourceFetcherManager.<init>(KafkaSourceFetcherManager.java:72) ~[flink-sql-connector-kafka-3.2.0-1.18.jar:3.2.0-1.18]
flink-jobmanager | at org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:176) ~[flink-sql-connector-kafka-3.2.0-1.18.jar:3.2.0-1.18]
flink-jobmanager | at org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:141) ~[flink-sql-connector-kafka-3.2.0-1.18.jar:3.2.0-1.18]
flink-jobmanager | at org.apache.flink.streaming.api.operators.SourceOperator.initReader(SourceOperator.java:314) ~[flink-dist-1.18.1.jar:1.18.1]
flink-jobmanager | at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.init(SourceOperatorStreamTask.java:93) ~[flink-dist-1.18.1.jar:1.18.1]
flink-jobmanager | at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:718) ~[flink-dist-1.18.1.jar:1.18.1]
flink-jobmanager | at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693) ~[flink-dist-1.18.1.jar:1.18.1]
flink-jobmanager | at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) ~[flink-dist-1.18.1.jar:1.18.1]
flink-jobmanager | at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922) ~[flink-dist-1.18.1.jar:1.18.1]
flink-jobmanager | at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) ~[flink-dist-1.18.1.jar:1.18.1]
flink-jobmanager | at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) ~[flink-dist-1.18.1.jar:1.18.1]
flink-jobmanager | at java.lang.Thread.run(Unknown Source) ~[?:?]
Flink 1.18.1
my flink image is build using the attached below.