hi hi all. got messages going into a mysql table,...
# troubleshooting
g
hi hi all. got messages going into a mysql table, using flink cdc to pull into a flink table. got a CDC Table in flink
Copy code
CREATE TABLE c_hive.db01.t_f_msqlcdc_salespayments (
        `invoiceNumber`         STRING,
        `payDateTime_Ltz`       STRING,
        `payTimestamp_Epoc`     STRING,
        `paid`                  DOUBLE,
        `finTransactionId`      STRING,
        `created_at`            TIMESTAMP,
        PRIMARY KEY(`invoiceNumber`) NOT ENFORCED
) WITH (
        'connector'             = 'mysql-cdc',
        'hostname'              = 'mysqlcdc',
        'port'                  = '3306',
        'username'              = 'flinkcdc',
        'password'              = 'flinkpw',
        'database-name'         = 'sales',
        'table-name'            = 'salespayments',
        'scan.startup.mode'     = 'earliest-offset',  -- <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/mysql-cdc/#startup-reading-position>
        'server-id'             = '184054'
);
I can select from above and see records arrive.
Copy code
CREATE OR REPLACE TABLE c_hive.db01.t_f_avro_salespayments (
    `invoiceNumber`     STRING,
    `payDateTime_Ltz`   STRING,
    `payTimestamp_Epoc` STRING,
    `paid`              DOUBLE,
    `finTransactionId`  STRING,
    `created_at`        TIMESTAMP(3),
    `payTimestamp_WM` AS TO_TIMESTAMP(FROM_UNIXTIME(CAST(`payTimestamp_Epoc` AS BIGINT) / 1000)),
    WATERMARK FOR `payTimestamp_WM` AS `payTimestamp_WM`,
    PRIMARY KEY (`invoiceNumber`) NOT ENFORCED
) WITH (
     'connector'                               = 'upsert-kafka'
    ,'topic'                                   = 'avro_salespayments'
    ,'properties.bootstrap.servers'            = 'broker:29092'
    ,'value.fields-include'                    = 'ALL'
    ,'value.format'                            = 'avro-confluent'
    ,'value.avro-confluent.schema-registry.url'= '<http://schema-registry:9081>'
    ,'key.format'                              = 'avro-confluent'
    ,'key.avro-confluent.schema-registry.url'  = '<http://schema-registry:9081>'
    ,'properties.group.id'                     = 'mysqlcdcsourced'
);
Using below to insert into above.
Copy code
INSERT INTO c_hive.db01.t_f_avro_salespayments (
    `invoiceNumber`,
    `payDateTime_Ltz`,
    `payTimestamp_Epoc`,
    `paid`,
    `finTransactionId`,
    `created_at`
  ) SELECT
    invoiceNumber,
    payDateTime_Ltz,  
    payTimestamp_Epoc,
    paid,
    finTransactionId,
    created_at
  FROM 
    c_hive.db01.t_f_msqlcdc_salespayments;
At this point I can see the records arrive in Kafka topic via kcat and via Control-Center If I run:
Copy code
select * from c_hive.db01.t_f_avro_salespayments;
Results in:
Copy code
.19.0.19:43253-40fb4d @ devlab-flink-taskmanager-2.devlab (dataPort=43123).
flink-jobmanager     | java.lang.NoSuchMethodError: 'void org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.<init>(org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue, java.util.function.Supplier, org.apache.flink.configuration.Configuration, java.util.function.Consumer)'
flink-jobmanager     | 	at org.apache.flink.connector.kafka.source.reader.fetcher.KafkaSourceFetcherManager.<init>(KafkaSourceFetcherManager.java:72) ~[flink-sql-connector-kafka-3.2.0-1.18.jar:3.2.0-1.18]
flink-jobmanager     | 	at org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:176) ~[flink-sql-connector-kafka-3.2.0-1.18.jar:3.2.0-1.18]
flink-jobmanager     | 	at org.apache.flink.connector.kafka.source.KafkaSource.createReader(KafkaSource.java:141) ~[flink-sql-connector-kafka-3.2.0-1.18.jar:3.2.0-1.18]
flink-jobmanager     | 	at org.apache.flink.streaming.api.operators.SourceOperator.initReader(SourceOperator.java:314) ~[flink-dist-1.18.1.jar:1.18.1]
flink-jobmanager     | 	at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.init(SourceOperatorStreamTask.java:93) ~[flink-dist-1.18.1.jar:1.18.1]
flink-jobmanager     | 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:718) ~[flink-dist-1.18.1.jar:1.18.1]
flink-jobmanager     | 	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693) ~[flink-dist-1.18.1.jar:1.18.1]
flink-jobmanager     | 	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) ~[flink-dist-1.18.1.jar:1.18.1]
flink-jobmanager     | 	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922) ~[flink-dist-1.18.1.jar:1.18.1]
flink-jobmanager     | 	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) ~[flink-dist-1.18.1.jar:1.18.1]
flink-jobmanager     | 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) ~[flink-dist-1.18.1.jar:1.18.1]
flink-jobmanager     | 	at java.lang.Thread.run(Unknown Source) ~[?:?]
Flink 1.18.1 my flink image is build using the attached below.