Sachin
07/04/2025, 5:37 AMDheemanth Gowda
07/07/2025, 2:52 PMCREATE TABLE daily_err_agg_stats (
...
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://...',
'table-name' = 'daily_err_agg_batch_v3_testing',
'sink.buffer-flush.max-rows' = '5000',
'sink.buffer-flush.interval' = '10s',
'sink.max-retries' = '3'
-- NO PRIMARY KEY (should be INSERT mode, not UPSERT)
);
Expected:
• 603,234 ÷ 30 = ~20,107 rows per subtask
• 20,107 ÷ 5,000 = 4 batches per subtask + remainder
• Total: ~120 batch INSERT operations
• Tried different buffer sizes (1000, 5000) and intervals (2s, 10s)
Actual:
Appears to be doing individual row inserts instead of batches
Environment:
• Flink 1.17
• PostgreSQL sink
• Batch execution mode (RuntimeExecutionMode.BATCH)
• No primary key constraint (should avoid UPSERT mode)
Questions:
1. Does buffer-flush-max-rows work per subtask or globally?
2. Are there any other settings needed to force batch mode?
3. How can I verify if batching is actually happening?
Any insights appreciated! 🙏
STATS from PG admin:
INSERT INTO daily_err_agg_batch_v3_testing(projectId, orgId, consumptionId, senderNumber, gupshupTemplateId, templateName, templateLanguage, accountType, failureCode, operatorErrorCode, failureReason, channelType, destinationCountry, sourceSystem, conversationCategoryType, pricingCategory, windowStart, windowEnd, failedCount, isDayLevel, modifiedTimestamp, derivedPricingCategory, pricingType, pricingModel, isbillable) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25)
Nitay
07/08/2025, 9:43 AMXinyuan Liang
07/08/2025, 11:10 AMMahesh Sambharam
07/08/2025, 3:28 PMGreg Reese
07/08/2025, 4:06 PMSink: Data Output (1/1)#16 (b1b559975d59b20b33fddd6c6e1399a8_cbc357ccb763df2852fee8c4fc7d55f2_0_16) switched from RUNNING to FAILED with failure cause:
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:830) ~[flink-java-1.0.0.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:665) ~[flink-java-1.0.0.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:646) ~[flink-java-1.0.0.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:626) ~[flink-java-1.0.0.jar:?]
at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.<init>(KafkaPartitionSplitReader.java:97) ~[flink-java-1.0.0.jar:?]
at org.apache.flink.connector.kafka.source.KafkaSource.lambda$createReader$1(KafkaSource.java:173) ~[flink-java-1.0.0.jar:?]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.createSplitFetcher(SplitFetcherManager.java:259) ~[flink-connector-files-1.19.1.jar:1.19.1]
at org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.addSplits(SingleThreadFetcherManager.java:148) ~[flink-connector-files-1.19.1.jar:1.19.1]
at org.apache.flink.connector.base.source.reader.SourceReaderBase.addSplits(SourceReaderBase.java:315) ~[flink-connector-files-1.19.1.jar:1.19.1]
at org.apache.flink.streaming.api.operators.SourceOperator.handleAddSplitsEvent(SourceOperator.java:601) ~[flink-java-1.0.0.jar:?]
at org.apache.flink.streaming.api.operators.SourceOperator.handleOperatorEvent(SourceOperator.java:570) ~[flink-java-1.0.0.jar:?]
at org.apache.flink.streaming.runtime.tasks.OperatorEventDispatcherImpl.dispatchEventToHandlers(OperatorEventDispatcherImpl.java:72) ~[flink-java-1.0.0.jar:?]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.dispatchOperatorEvent(RegularOperatorChain.java:80) ~[flink-java-1.0.0.jar:?]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$dispatchOperatorEvent$22(StreamTask.java:1540) ~[flink-java-1.0.0.jar:?]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-java-1.0.0.jar:?]
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) ~[flink-java-1.0.0.jar:?]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398) ~[flink-java-1.0.0.jar:?]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367) ~[flink-java-1.0.0.jar:?]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352) ~[flink-java-1.0.0.jar:?]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229) ~[flink-java-1.0.0.jar:?]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909) ~[flink-java-1.0.0.jar:?]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858) ~[flink-java-1.0.0.jar:?]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) ~[flink-java-1.0.0.jar:?]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) ~[flink-java-1.0.0.jar:?]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) ~[flink-java-1.0.0.jar:?]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) ~[flink-java-1.0.0.jar:?]
at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:405) ~[flink-java-1.0.0.jar:?]
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:436) ~[flink-java-1.0.0.jar:?]
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:421) ~[flink-java-1.0.0.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:709) ~[flink-java-1.0.0.jar:?]
... 26 more
Jeremie Doehla
07/08/2025, 6:29 PMJdbcStatementBuilder
that I'm working with and running into issues. If I have
val insertOperation: JdbcStatementBuilder[DataTracking] = (ps: PreparedStatement, dt: DataTracking) => {
ps.setString(1, dt.file_name)
ps.setBoolean(2, dt.processed_file)
ps.setString(3, dt.data_origin)
ps.setString(4, dt.data_type)
ps.setTimestamp(5, Timestamp.from(dt.received_time))
}
and use this statement everything is working correctly and processes fine. However if I update the table to add a new column and update the insert sql to also insert into this new column such that I now have this statement:
val insertOperation: JdbcStatementBuilder[DataTracking] = (ps: PreparedStatement, dt: DataTracking) => {
ps.setString(1, dt.file_name)
ps.setBoolean(2, dt.processed_file)
ps.setString(3, dt.data_origin)
ps.setString(4, dt.data_type)
ps.setObject(5, dt.data_location.orNull, Types.VARCHAR)
ps.setTimestamp(6, Timestamp.from(dt.received_time))
}
this results in an error. I've also used
dt.data_location match {
case Some(value) => ps.setString(5, value)
case None => ps.setNull(5, Types.VARCHAR)
}
rather than the setObject
but see the same error either way. Any ideas on what to do here? I can provide the error message -- I'll place that in the thread.Mahesh Sambharam
07/09/2025, 9:49 AMpatricia lee
07/09/2025, 11:16 AMenv.setRegisterPojoType(Class<?> type)
this does not exist. Any feed back is appreciated. Thanks
StreamExecutionEnvironment env = new StreamExcecutionEnvironment();
env.setRegistrTypes(MyClass.class);
Boopathy Raja
07/09/2025, 6:29 PMJeremy DeGroot
07/10/2025, 7:52 PMjava.lang.AbstractMethodError: Receiver class org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$$Lambda$1361/0x0000000800e04040 does not define or inherit an implementation of the resolved method 'abstract org.apache.flink.util.clock.RelativeClock getInputActivityClock()' of interface org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier$Context.
Has anyone ever seen an error like this? I'm using the Kafka Conector at version 3.3.0-1.19 and running on the AWS Managed Flink 1.19 runtimeumar farooq
07/11/2025, 8:51 AMמייקי בר יעקב
07/11/2025, 3:37 PMMahesh Sambharam
07/11/2025, 6:37 PMid
DECIMAL(20, 0) NOT NULL], splitStart=null, splitEnd=null, highWatermark=null} error due to org.apache.flink.util.FlinkRuntimeException: Cannot read the binlog filename and position via 'SHOW MASTER STATUS'. Make sure your server is correctly configured.
at org.apache.flink.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.checkReadException(SnapshotSplitReader.java:387)
at org.apache.flink.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.pollWithBuffer(SnapshotSplitReader.java:323)
at org.apache.flink.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.pollSplitRecords(SnapshotSplitReader.java:288)
at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader.pollSplitRecords(MySqlSplitReader.java:122)
at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:84)
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
... 6 more
Caused by: io.debezium.DebeziumException: org.apache.flink.util.FlinkRuntimeException: Cannot read the binlog filename and position via 'SHOW MASTER STATUS'. Make sure your server is correctly configured
at org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.execute(MySqlSnapshotSplitReadTask.java:134)
at org.apache.flink.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.snapshot(SnapshotSplitReader.java:176)
at org.apache.flink.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.lambda$submitSplit$1(SnapshotSplitReader.java:161)
... 3 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot read the binlog filename and position via 'SHOW MASTER STATUS'. Make sure your server is correctly configured
at org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.lambda$currentBinlogOffset$0(DebeziumUtils.java:136)
at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:642)
at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:510)
at org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.currentBinlogOffset(DebeziumUtils.java:123)
at org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.doExecute(MySqlSnapshotSplitReadTask.java:156)
at org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.execute(MySqlSnapshotSplitReadTask.java:129)
... 5 more
[INFO]
[INFO] Results:
[INFO]
[ERROR] Errors:
[ERROR] MySqlFullTypesITCase.testMysql57CommonDataTypes106 >testCommonDataTypes476 » Runtime
[ERROR] MySqlFullTypesITCase.testMysql57JsonDataTypes113 >testJsonDataType524 » Runtime
[ERROR] MySqlFullTypesITCase.testMysql57JsonDataTypesWithUseLegacyJsonFormat120 >testJsonDataType524 » Runtime
[ERROR] MySqlFullTypesITCase.testMysql57PrecisionTypes256 >testMysqlPrecisionTypes366 » Runtime
[ERROR] MySqlFullTypesITCase.testMysql57TimeDataTypes191 >testTimeDataTypes583 » Runtime
[INFO]
[ERROR] Tests run: 10, Failures: 0, Errors: 5, Skipped: 0
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 01:16 min
[INFO] Finished at: 2025-07-11T182727Z
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.maven.pluginsmaven surefire plugin3.0.0-M5:test (default-test) on project flink-cdc-pipeline-connector-mysql: There are test failures.
[ERROR]
[ERROR] Please refer to /mnt/d/Tech/flink-cdc/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/target/surefire-reports for the individual test results.
[ERROR] Please refer to dump files (if any exist) [date].dump, [date]-jvmRun[N].dump and [date].dumpstream.
[ERROR] -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureExceptiondontu balu
07/12/2025, 12:49 AMmvn clean install -DskipTests -Pfast
[INFO] Reactor Summary for flink-cdc-parent 3.4-SNAPSHOT:
[INFO]
[INFO] flink-cdc-parent ................................... SUCCESS [ 2.244 s]
[INFO] flink-cdc-common ................................... SUCCESS [ 3.585 s]
[INFO] flink-cdc-pipeline-udf-examples .................... FAILURE [ 1.203 s]
[INFO] flink-cdc-pipeline-model ........................... SKIPPED
[INFO] flink-cdc-runtime .................................. SKIPPED
[INFO] flink-cdc-connect .................................. SKIPPED
[INFO] flink-cdc-pipeline-connectors ...................... SKIPPED
[INFO] flink-cdc-pipeline-connector-values ................ SKIPPED
[INFO] flink-cdc-composer ................................. SKIPPED
[INFO] flink-cdc-cli ...................................... SKIPPED
[INFO] flink-cdc-dist ..................................... SKIPPED
[INFO] flink-cdc-source-connectors ........................ SKIPPED
[INFO] flink-connector-debezium ........................... SKIPPED
[INFO] flink-connector-test-util .......................... SKIPPED
[INFO] flink-cdc-base ..................................... SKIPPED
[INFO] flink-connector-db2-cdc ............................ SKIPPED
[INFO] flink-connector-mongodb-cdc ........................ SKIPPED
[INFO] flink-connector-mysql-cdc .......................... SKIPPED
[INFO] flink-connector-oceanbase-cdc ...................... SKIPPED
[INFO] flink-connector-oracle-cdc ......................... SKIPPED
[INFO] flink-connector-postgres-cdc ....................... SKIPPED
[INFO] flink-connector-sqlserver-cdc ...................... SKIPPED
[INFO] flink-connector-tidb-cdc ........................... SKIPPED
[INFO] flink-connector-vitess-cdc ......................... SKIPPED
[INFO] flink-sql-connector-db2-cdc ........................ SKIPPED
[INFO] flink-sql-connector-mongodb-cdc .................... SKIPPED
[INFO] flink-sql-connector-mysql-cdc ...................... SKIPPED
[INFO] flink-sql-connector-oceanbase-cdc .................. SKIPPED
[INFO] flink-sql-connector-oracle-cdc ..................... SKIPPED
[INFO] flink-sql-connector-postgres-cdc ................... SKIPPED
[INFO] flink-sql-connector-sqlserver-cdc .................. SKIPPED
[INFO] flink-sql-connector-tidb-cdc ....................... SKIPPED
[INFO] flink-sql-connector-vitess-cdc ..................... SKIPPED
[INFO] flink-cdc-pipeline-connector-mysql ................. SKIPPED
[INFO] flink-cdc-pipeline-connector-doris ................. SKIPPED
[INFO] flink-cdc-pipeline-connector-starrocks ............. SKIPPED
[INFO] flink-cdc-pipeline-connector-kafka ................. SKIPPED
[INFO] flink-cdc-pipeline-connector-paimon ................ SKIPPED
[INFO] flink-cdc-pipeline-connector-elasticsearch ......... SKIPPED
[INFO] flink-cdc-pipeline-connector-oceanbase ............. SKIPPED
[INFO] flink-cdc-pipeline-connector-maxcompute ............ SKIPPED
[INFO] flink-cdc-pipeline-connector-iceberg ............... SKIPPED
[INFO] flink-cdc-e2e-tests ................................ SKIPPED
[INFO] flink-cdc-e2e-utils ................................ SKIPPED
[INFO] flink-cdc-source-e2e-tests ......................... SKIPPED
[INFO] flink-cdc-pipeline-e2e-tests ....................... SKIPPED
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 7.254 s
[INFO] Finished at: 2025-07-11T17:44:27-07:00
[INFO] ------------------------------------------------------------------------
[INFO] Compiler bridge file: /Users/vdontu/.sbt/1.0/zinc/org.scala-sbt/org.scala-sbt-compiler-bridge_2.12-1.8.0-bin_2.12.16__68.0-1.8.0_20221110T195421.jar
[INFO] Compiler bridge file is not installed yet
error:
bad constant pool index: 0 at pos: 49416
while compiling: <no file>
during phase: globalPhase=<no phase>, enteringPhase=<some phase>
library version: version 2.12.16
compiler version: version 2.12.16
reconstructed args: -bootclasspath /Users/vdontu/.m2/repository/org/scala-lang/scala-library/2.12.16/scala-library-2.12.16.jar -classpath /Users/vdontu/.m2/repository/org/scala-sbt/util-interface/1.8.0/util-interface-1.8.0.jar:/Users/vdontu/.m2/repository/org/scala-lang/scala-compiler/2.12.16/scala-compiler-2.12.16.jar:/Users/vdontu/.m2/repository/org/scala-lang/modules/scala-xml_2.12/1.0.6/scala-xml_2.12-1.0.6.jar:/Users/vdontu/.m2/repository/jline/jline/2.14.6/jline-2.14.6.jar:/Users/vdontu/.m2/repository/org/scala-lang/scala-reflect/2.12.16/scala-reflect-2.12.16.jar:/Users/vdontu/.m2/repository/org/scala-sbt/compiler-interface/1.8.0/compiler-interface-1.8.0.jar:/Users/vdontu/.m2/repository/org/scala-lang/modules/scala-xml_2.12/1.0.6/scala-xml_2.12-1.0.6.jar:/Users/vdontu/.m2/repository/org/scala-lang/scala-reflect/2.12.16/scala-reflect-2.12.16.jar:/Users/vdontu/.m2/repository/jline/jline/2.14.6/jline-2.14.6.jar:/Users/vdontu/.m2/repository/org/scala-lang/scala-compiler/2.12.16/scala-compiler-2.12.16.jar:/Users/vdontu/.m2/repository/org/scala-lang/scala-library/2.12.16/scala-library-2.12.16.jar -d /var/folders/2n/bls6r2l513dflwnvtx4yp8k00000gq/T/scala-maven-plugin-compiler-bridge-classes12287687447114078348
last tree to typer: EmptyTree
tree position: <unknown>
tree tpe: <notype>
symbol: null
call site: <none> in <none>
== Source file context for tree position ==
error: scala.reflect.internal.FatalError:
bad constant pool index: 0 at pos: 49416
while compiling: <no file>
during phase: globalPhase=<no phase>, enteringPhase=<some phase>
library version: version 2.12.16
compiler version: version 2.12.16
reconstructed args: -bootclasspath /Users/vdontu/.m2/repository/org/scala-lang/scala-library/2.12.16/scala-library-2.12.16.jar -classpath /Users/vdontu/.m2/repository/org/scala-sbt/util-interface/1.8.0/util-interface-1.8.0.jar:/Users/vdontu/.m2/repository/org/scala-lang/scala-compiler/2.12.16/scala-compiler-2.12.16.jar:/Users/vdontu/.m2/repository/org/scala-lang/modules/scala-xml_2.12/1.0.6/scala-xml_2.12-1.0.6.jar:/Users/vdontu/.m2/repository/jline/jline/2.14.6/jline-2.14.6.jar:/Users/vdontu/.m2/repository/org/scala-lang/scala-reflect/2.12.16/scala-reflect-2.12.16.jar:/Users/vdontu/.m2/repository/org/scala-sbt/compiler-interface/1.8.0/compiler-interface-1.8.0.jar:/Users/vdontu/.m2/repository/org/scala-lang/modules/scala-xml_2.12/1.0.6/scala-xml_2.12-1.0.6.jar:/Users/vdontu/.m2/repository/org/scala-lang/scala-reflect/2.12.16/scala-reflect-2.12.16.jar:/Users/vdontu/.m2/repository/jline/jline/2.14.6/jline-2.14.6.jar:/Users/vdontu/.m2/repository/org/scala-lang/scala-compiler/2.12.16/scala-compiler-2.12.16.jar:/Users/vdontu/.m2/repository/org/scala-lang/scala-library/2.12.16/scala-library-2.12.16.jar -d /var/folders/2n/bls6r2l513dflwnvtx4yp8k00000gq/T/scala-maven-plugin-compiler-bridge-classes12287687447114078348
last tree to typer: EmptyTree
tree position: <unknown>
tree tpe: <notype>
symbol: null
call site: <none> in <none>
== Source file context for tree position ==
at scala.reflect.internal.Reporting.abort(Reporting.scala:69)
at scala.reflect.internal.Reporting.abort$(Reporting.scala:65)
at scala.reflect.internal.SymbolTable.abort(SymbolTable.scala:28)
at scala.tools.nsc.symtab.classfile.ClassfileParser$ConstantPool.errorBadIndex(ClassfileParser.scala:385)
at scala.tools.nsc.symtab.classfile.ClassfileParser$ConstantPool.getExternalName(ClassfileParser.scala:249)
at scala.tools.nsc.symtab.classfile.ClassfileParser.readParamNames$1(ClassfileParser.scala:828)
at scala.tools.nsc.symtab.classfile.ClassfileParser.parseAttribute$1(ClassfileParser.scala:834)
at scala.tools.nsc.symtab.classfile.ClassfileParser.$anonfun$parseAttributes$7(ClassfileParser.scala:908)
at scala.tools.nsc.symtab.classfile.ClassfileParser.parseAttributes(ClassfileParser.scala:908)
at scala.tools.nsc.symtab.classfile.ClassfileParser.parseMethod(ClassfileParser.scala:611)
at scala.tools.nsc.symtab.classfile.ClassfileParser.$anonfun$parseClass$4(ClassfileParser.scala:534)
at scala.tools.nsc.symtab.classfile.ClassfileParser.parseClass(ClassfileParser.scala:534)
at scala.tools.nsc.symtab.classfile.ClassfileParser.$anonfun$parse$2(ClassfileParser.scala:160)
at scala.tools.nsc.symtab.classfile.ClassfileParser.$anonfun$parse$1(ClassfileParser.scala:146)
at scala.tools.nsc.symtab.classfile.ClassfileParser.parse(ClassfileParser.scala:129)
at scala.tools.nsc.symtab.SymbolLoaders$ClassfileLoader.doComplete(SymbolLoaders.scala:343)
at scala.tools.nsc.symtab.SymbolLoaders$SymbolLoader.complete(SymbolLoaders.scala:250)
at scala.reflect.internal.Symbols$Symbol.completeInfo(Symbols.scala:1542)
at scala.reflect.internal.Symbols$<http://Symbol.info|Symbol.info>(Symbols.scala:1514)
at scala.reflect.internal.Definitions.scala$reflect$internal$Definitions$$enterNewMethod(Definitions.scala:49)
at scala.reflect.internal.Definitions$DefinitionsClass.String_$plus$lzycompute(Definitions.scala:1134)
at scala.reflect.internal.Definitions$DefinitionsClass.String_$plus(Definitions.scala:1134)
at scala.reflect.internal.Definitions$DefinitionsClass.syntheticCoreMethods$lzycompute(Definitions.scala:1438)
at scala.reflect.internal.Definitions$DefinitionsClass.syntheticCoreMethods(Definitions.scala:1420)
at scala.reflect.internal.Definitions$DefinitionsClass.symbolsNotPresentInBytecode$lzycompute(Definitions.scala:1450)
at scala.reflect.internal.Definitions$DefinitionsClass.symbolsNotPresentInBytecode(Definitions.scala:1450)
at scala.reflect.internal.Definitions$DefinitionsClass.init(Definitions.scala:1506)
at scala.tools.nsc.Global$Run.<init>(Global.scala:1214)
at scala.tools.nsc.Driver.doCompile(Driver.scala:46)
at scala.tools.nsc.MainClass.doCompile(Main.scala:32)
at scala.tools.nsc.Driver.process(Driver.scala:67)
at scala.tools.nsc.Main.process(Main.scala)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
at java.base/java.lang.reflect.Method.invoke(Method.java:565)
at sbt.internal.inc.RawCompiler.getReporter$1(RawCompiler.scala:56)
at sbt.internal.inc.RawCompiler.apply(RawCompiler.scala:77)
at sbt_inc.CompilerBridgeFactory.getScala2CompilerBridgeJar(CompilerBridgeFactory.java:177)
at sbt_inc.CompilerBridgeFactory.getCompiledBridgeJar(CompilerBridgeFactory.java:60)
at sbt_inc.SbtIncrementalCompilers.make(SbtIncrementalCompilers.java:51)
at scala_maven.ScalaCompilerSupport.incrementalCompile(ScalaCompilerSupport.java:305)
at scala_maven.ScalaCompilerSupport.compile(ScalaCompilerSupport.java:119)
at scala_maven.ScalaCompilerSupport.doExecute(ScalaCompilerSupport.java:86)
Can someone please help with this?Mahesh Sambharam
07/12/2025, 10:57 AMGanesh
07/13/2025, 5:05 PMGanesh
07/13/2025, 5:05 PMRushikesh Gulve
07/14/2025, 7:15 AMapiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
kind: FlinkDeployment
metadata:
name: data-processing-flink
namespace: flink
spec:
image: <http://624083781302.dkr.ecr.ap-south-1.amazonaws.com/product/data-streaming-flink:latest|624083781302.dkr.ecr.ap-south-1.amazonaws.com/product/data-streaming-flink:latest>
flinkVersion: "v1_20"
serviceAccount: flink
flinkConfiguration:
rest.port: "8081"
jobmanager.rpc.address: "data-processing-flink"
jobmanager.rpc.port: "6123"
#taskmanager.numberOfTaskSlots: "2"
# High Availability
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: file:///flink-data/ha
# Checkpoints and Savepoints
state.checkpoints.dir: file:///flink-data/checkpoints
state.savepoints.dir: file:///flink-data/savepoints
state.backend.type: rocksdb
state.backend.rocksdb.memory.managed: "true" # Enable Flink's memory management for RocksDB
state.backend.rocksdb.memory.write-buffer-size: "64mb" # Limit write buffer size
state.backend.rocksdb.memory.max-file-size-to-compact: "2mb" # Increase compaction file size
state.backend.rocksdb.memory.min-files-to-compact: "3" # Reduce compaction frequency
state.backend.rocksdb.memory.max-files-to-compact: "10"
state.backend.rocksdb.memory.max-output-file-size: "128mb"
state.backend.rocksdb.memory.cache-capacity: "256mb" # Reduce cache size
state.backend.rocksdb.memory.managed: true
state.backend.rocksdb.memory.write-buffer-ratio: "0.5"
state.backend.rocksdb.memory.high-prio-pool-ratio: "0.1"
python.fn-execution.bundle.size: "100"
python.fn-execution.memory.managed: "true"
python.fn-execution.buffer.memory.size: "64mb"
#execution.checkpointing.alignment-timeout: "0" # ms, after which checkpoint switches to unaligned mode dynamically
#execution.checkpointing.unaligned: "true" # Enable unaligned checkpoints
#rest.profiling.enabled: "true"
#env.java.opts.taskmanager: "-Dorg.apache.beam.sdk.fn=DEBUG -Dorg.apache.beam.runners.fnexecution.control.FnApiControlClient=DEBUG"
podTemplate:
spec:
imagePullSecrets:
- name: ecrscr-credentials
containers:
- name: flink-main-container
imagePullPolicy: Always
envFrom:
- secretRef:
name: redis-secret-main
env:
- name: _PYTHON_WORKER_MEMORY_LIMIT
value: "350000000"
- name: KAFKA_BROKER
value: "kafka.flink.svc.cluster.local:9092"
- name: BOOTSTRAP_SERVERS
value: "kafka.flink.svc.cluster.local:9092"
- name: MINIO_SERVER
value: "<http://myminio-hl.flink.svc.cluster.local:9000>"
- name: MINIO_USER
value: "minio"
- name: MINIO_PASSWORD
value: "minio123"
- name: MAX_PARALLELISM
value: "128"
volumeMounts:
- mountPath: /flink-data
name: flink-pvc
volumes:
- name: flink-pvc
persistentVolumeClaim:
claimName: flink-data-pvc
securityContext:
runAsUser: 9999
runAsGroup: 9999
jobManager:
resource:
memory: "3072m"
cpu: 3
taskManager:
resource:
memory: "4096m"
cpu: 1
replicas: 32
job:
entryClass: "org.apache.flink.client.python.PythonDriver"
args: ["-pyclientexec", "/usr/bin/python3", "-py", "/opt/flink/usrlib/data_processing_v9/stream_processing/process_dependency/create_env.py"]
upgradeMode: stateless
I have disabled checkpointing purposely.
The issue is I have kept parallelism as 32 and have multiple processes. Now I am seeing in all the task manager logs that the process is completed simultaneously but in kafka partitions the data is received one after another with lot of delay. My output is only a single data point sinked to kafka which is link to a s3 bucket. Why am I not seeing all the data in kafka partitions simultaneously is data is processed simultaneously. (the parallelism is utilized by producing different keys to different partitions)
KafkaSink.builder()
.set_bootstrap_servers(BOOTSTRAP_SERVERS)
.set_record_serializer(
KafkaRecordSerializationSchema.builder()
.set_topic(topic)
.set_value_serialization_schema(
JsonRowSerializationSchema.builder()
.with_type_info(type_info)
.build()
)
.build()
)
.set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build()
this is my kafka sink.
I am using flink 1.20
Any help is greatly appreciated. Thankyou.dontu balu
07/14/2025, 10:55 PM6543
for client replication, however when I check with process list in MYSQL server, i see the ID as 371575
.
mysql> SHOW PROCESSLIST;
371575 | cdcuser | xx.xx.xx.xx:41180 | NULL | Binlog Dump GTID | 11926 | Source has sent all binlog to replica; waiting for more updates | NULL
How can I check all the client replication server id's used on MYSQL instance so that when I create another flink app, I can specify a unique server id which is not used.?Chris Harm
07/15/2025, 4:53 PMLeonardo Nascimento
07/15/2025, 10:19 PMflink-kubernetes-operator
is unable to create the Ingress component due to an issue in fabric8 (https://github.com/fabric8io/kubernetes-client/issues/7037).
I found this JIRA issue (https://issues.apache.org/jira/browse/FLINK-38093) from two days ago proposing an upgrade to Fabric8 version 7.3.1. However, this is already the version used in the main branch (https://github.com/apache/flink-kubernetes-operator/blob/main/pom.xml#L81).
Is there any timeline for a new release? I couldn't find it.
The stack trace from the error in flink-kubernetes-operator
logs:
2025-07-15 21:26:43,949 i.j.o.p.e.EventProcessor [ERROR][datapipe-data/stream-consumer] Error during event processing ExecutionScope{ resource id: ResourceID{name='stream-consumer', namespace='datapipe-data'}, version: 2281422965}
org.apache.flink.kubernetes.operator.exception.ReconciliationException: io.fabric8.kubernetes.client.KubernetesClientException: An error has occurred.
at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:167)
at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:63)
at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:153)
at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:111)
at org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
at io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:110)
at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:136)
at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:117)
at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:91)
at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:64)
at io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:452)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: io.fabric8.kubernetes.client.KubernetesClientException: An error has occurred.
at io.fabric8.kubernetes.client.KubernetesClientException.copyAsCause(KubernetesClientException.java:238)
at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.waitForResult(OperationSupport.java:507)
at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.handleResponse(OperationSupport.java:524)
at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.restCall(OperationSupport.java:711)
at io.fabric8.kubernetes.client.impl.BaseClient.getVersionInfo(BaseClient.java:323)
at io.fabric8.kubernetes.client.impl.KubernetesClientImpl.getKubernetesVersion(KubernetesClientImpl.java:641)
at org.apache.flink.kubernetes.operator.utils.IngressUtils.ingressInNetworkingV1(IngressUtils.java:272)
at org.apache.flink.kubernetes.operator.utils.IngressUtils.getIngress(IngressUtils.java:96)
at org.apache.flink.kubernetes.operator.utils.IngressUtils.updateIngressRules(IngressUtils.java:72)
at org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:188)
at org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:65)
at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.restoreJob(AbstractJobReconciler.java:393)
at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.reconcileSpecChange(AbstractJobReconciler.java:200)
at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:164)
at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:156)
... 13 more
Caused by: io.fabric8.kubernetes.client.KubernetesClientException: An error has occurred.
at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:129)
at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:122)
at io.fabric8.kubernetes.client.utils.KubernetesSerialization.unmarshal(KubernetesSerialization.java:261)
at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.lambda$handleResponse$0(OperationSupport.java:551)
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
at io.fabric8.kubernetes.client.http.StandardHttpClient.lambda$completeOrCancel$10(StandardHttpClient.java:142)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
at io.fabric8.kubernetes.client.http.ByteArrayBodyHandler.onBodyDone(ByteArrayBodyHandler.java:51)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
at io.fabric8.kubernetes.client.okhttp.OkHttpClientImpl$OkHttpAsyncBody.doConsume(OkHttpClientImpl.java:136)
... 3 more
Caused by: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "emulationMajor" (class io.fabric8.kubernetes.client.VersionInfo), not marked as ignorable (9 known properties: "goVersion", "gitTreeState", "platform", "minor", "gitVersion", "gitCommit", "buildDate", "compiler", "major"])
at [Source: (BufferedInputStream); line: 4, column: 22] (through reference chain: io.fabric8.kubernetes.client.VersionInfo["emulationMajor"])
at com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:61)
at com.fasterxml.jackson.databind.DeserializationContext.handleUnknownProperty(DeserializationContext.java:1138)
at com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:2224)
at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1709)
at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownVanilla(BeanDeserializerBase.java:1687)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:320)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:177)
at com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:323)
at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:2105)
at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1481)
at io.fabric8.kubernetes.client.utils.KubernetesSerialization.unmarshal(KubernetesSerialization.java:257)
... 18 more
Max Schmidt
07/16/2025, 7:08 AMjava.lang.IllegalArgumentException: Failed to initialize class org.apache.hadoop.fs.azurebfs.oauth2.WorkloadIdentityTokenProvider
Same thing for this person:
https://lists.apache.org/thread/6b841lqjxryxqyv4737bo46y41z6zgctAvinesh Sachan
07/16/2025, 11:30 AMRavisangar Vijayan
07/17/2025, 6:00 AMCaused by: java.util.concurrent.ExecutionException: java.util.concurrent.TimeoutException: Invocation of [LocalRpcInvocation(RestfulGateway.requestJobStatus(JobID, Duration))] at recipient [<pekko.tcp://flink@gvhvhgvhgcggh>] timed out. This is usually caused by: 1) Pekko failed sending the message silently, due to problems like oversized payload or serialization failures. In that case, you should find detailed error information in the logs. 2) The recipient needs more time for responding, due to problems like slow machines or network jitters. In that case, you can try to increase pekko.ask.timeout.
Fabrizzio Chavez
07/17/2025, 8:33 AMJan Kevin Dick
07/17/2025, 1:07 PMAmit Peshwani
07/17/2025, 5:01 PMpublic class FirehoseSinkFactory {
public static KinesisFirehoseSink<OutputRecord> createFirehoseSink(final Properties firehoseSinkConfigProperties) {
final String DELIVERY_STREAM_NAME = "deliveryStreamName";
return KinesisFirehoseSink.<OutputRecord>builder()
.setFirehoseClientProperties(firehoseSinkConfigProperties)
.setSerializationSchema(new FirehoseSerializationSchema())
.setDeliveryStreamName(firehoseSinkConfigProperties.getProperty(DELIVERY_STREAM_NAME))
.build();
}
}
Could someone guide/help on how to make sure connection is not timeout while using it?
property_group_id = "FirehoseSinkConfigProperties"
property_map = {
"aws.region" = "aws-region-name"
"deliveryStreamName" = "stream-name"
}
Pragya Agarwal
07/17/2025, 5:21 PMjq l
07/18/2025, 6:29 AMFLINK SQL/CREATE TABLE oracle_prod_testQWE(
COLUMN1 BIGINT,
COLUMN2 STRING,
PRIMARY KEY (COLUMN1) NOT ENFORCED
) WITH (
'connector' = 'oracle-cdc',
'hostname' = '<http://10.xxx|10.xxx>',
'port' = '1521',
'username' = 'c##xxx',
'password' = 'xxx',
'database-name' = 'orclCdb',
'schema-name' = 'APP_USER',
'table-name' = 'TABLEZXC',
'debezium.database.pdb.name' = 'db1cdb',
'debezium.log.mining.strategy' = 'online_catalog',
'debezium.snapshot.mode' = 'initial'
);
FLINK SQL/select * from oracle_prod_testQWE;
My Flink job always reports this error:
xxxxxxxxxx Caused by: java.io.IOException: io.debezium.DebeziumException: The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot. at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:101) at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) ... 6 moreCaused by: io.debezium.DebeziumException: The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot. at io.debezium.relational.HistorizedRelationalDatabaseSchema.recover(HistorizedRelationalDatabaseSchema.java:59) at org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleSourceFetchTaskContext.validateAndLoadDatabaseHistory(OracleSourceFetchTaskContext.java:275) at org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleSourceFetchTaskContext.configure(OracleSourceFetchTaskContext.java:118) at org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.submitTask(IncrementalSourceStreamFetcher.java:84) at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.submitStreamSplit(IncrementalSourceSplitReader.java:261) at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.pollSplitRecords(IncrementalSourceSplitReader.java:153) at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:98)
Sure, I'd be happy to help! Could you please provide more details about the error message? What specific error are you encountering, and what steps have you taken so far to troubleshoot the issue? The more information you can give me, the better I can assist you.😭😭😭