hi all,Im using flink cdc to sync the data from sq...
# troubleshooting
z
hi all,Im using flink cdc to sync the data from sqlsever to doris base on this web https://doris.apache.org/docs/ecosystem/flink-doris-connector/ I can submit the job to flink but it always has the exception .How to slove it ,thanks all
Copy code
java.lang.ClassCastException: class com.ververica.cdc.debezium.internal.FlinkOffsetBackingStore cannot be cast to class org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.storage.OffsetBackingStore (com.ververica.cdc.debezium.internal.FlinkOffsetBackingStore and org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.storage.OffsetBackingStore are in unnamed module of loader 'app')
	at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:681)
	at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:192)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
d
what are the versions of Flink, Doris and the Flink-Doris connector?
You should check Flink, Doris and the Flink-Doris-Connector release notes to make sure that they are compatible versions you are using.
It looks like there might be a shading issue and possibly two versions of the same class that are not compatible
Copy code
org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.storage.OffsetBackingStore
you can try running
Copy code
mvn dependency tree
to check for duplicates. After that it maybe necessary to exclude one or the other library.
But first thing is to check compatibility of all the libraries in use via the documentation.
z
I use it by linux code ,so there is no jar .And my connector can be used when I not use cdc.I also can use flink-sql.sh to run cdc by singe table .But now I need all database sync without create table by myself
Maybe there is other way to do this?
d
What version of Flink, Flink-Doris_connector and Doris are you using?
And how are you currently creating and submitting the Job itself?
z
Flink1.18,flink-doris-connector-1.18-1.5.2.jar,Doris 2.14,and I use Linux cli to submit flink job by this code
Copy code
bin/flink run \
     -Dexecution.checkpointing.interval=10s \
     -Dparallelism.default=1 \
     -c org.apache.doris.flink.tools.cdc.CdcTools \
     ./lib/flink-doris-connector-1.18-1.5.2.jar \
     sqlserver-sync-database \
     --database SQLCDC\
     --sqlserver-conf hostname=<http://172.16.160.XXX|172.16.160.XXX> \
     --sqlserver-conf port=1433 \
     --sqlserver-conf username=sa \
     --sqlserver-conf password='XXX' \
     --sqlserver-conf database-name=DATA  \
     --sqlserver-conf schema-name=dbo \
     --including-tables "t_data_dwd_tbms" \
     --sink-conf fenodes=172.16.19.XXXX \
     --sink-conf username=root \
     --sink-conf password='XXXXX'\
     --sink-conf jdbc-url=jdbc:<mysql://172.16.19.XXXX> \
     --sink-conf sink.label-prefix=label \
     --table-conf replication_num=1