https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • s

    Sachin

    07/04/2025, 5:37 AM
    Can someone please share the idea if they have a disaster recovery support for flink cluster in production and how the exactly-once is being managed in such setups?
    p
    s
    k
    • 4
    • 7
  • d

    Dheemanth Gowda

    07/07/2025, 2:52 PM
    Hi Everyone, JDBC Sink Batch Insert Not Triggering - Need Help(FLINK SQL) Problem: JDBC sink with 603,234 rows, parallelism=30, buffer-flush-max-rows=5000 is not doing batch inserts. All rows seem to be inserted individually.
    Copy code
    CREATE TABLE daily_err_agg_stats (
        ...
    ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:postgresql://...',
        'table-name' = 'daily_err_agg_batch_v3_testing',
        'sink.buffer-flush.max-rows' = '5000',
        'sink.buffer-flush.interval' = '10s',
        'sink.max-retries' = '3'
        -- NO PRIMARY KEY (should be INSERT mode, not UPSERT)
    );
    Expected: • 603,234 ÷ 30 = ~20,107 rows per subtask • 20,107 ÷ 5,000 = 4 batches per subtask + remainder • Total: ~120 batch INSERT operations • Tried different buffer sizes (1000, 5000) and intervals (2s, 10s) Actual: Appears to be doing individual row inserts instead of batches Environment: • Flink 1.17 • PostgreSQL sink • Batch execution mode (RuntimeExecutionMode.BATCH) • No primary key constraint (should avoid UPSERT mode) Questions: 1. Does buffer-flush-max-rows work per subtask or globally? 2. Are there any other settings needed to force batch mode? 3. How can I verify if batching is actually happening? Any insights appreciated! 🙏 STATS from PG admin:
    Copy code
    INSERT INTO daily_err_agg_batch_v3_testing(projectId, orgId, consumptionId, senderNumber, gupshupTemplateId, templateName, templateLanguage, accountType, failureCode, operatorErrorCode, failureReason, channelType, destinationCountry, sourceSystem, conversationCategoryType, pricingCategory, windowStart, windowEnd, failedCount, isDayLevel, modifiedTimestamp, derivedPricingCategory, pricingType, pricingModel, isbillable) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25)
  • n

    Nitay

    07/08/2025, 9:43 AM
    Hi guys I'd like to get some help I use RocksDB and I get this error sometimes: java.nio.channels.ClosedChannelException In RocksDBStateDownloader Does anyone know how can I fix this? Flink 1.16, Java 11
  • x

    Xinyuan Liang

    07/08/2025, 11:10 AM
    Hi folks, 👋 We're using Flink Operator v1.12 with Flink 2.0, and looking for some help understanding and debugging a mismatch between the r*econciled upgradeMode* and what we’ve configured in our FlinkDeployment spec. Any guidance on this would be greatly appreciated🙏🏻 Thanks!
    • 1
    • 1
  • m

    Mahesh Sambharam

    07/08/2025, 3:28 PM
    Hi Everyone, I'm trying to checkout Flink using git ssh, i got below error:
    t
    • 2
    • 6
  • g

    Greg Reese

    07/08/2025, 4:06 PM
    Hi all! I am having an issue with flink with the SolaceSourceConnector, does anybody have any ideas? (Strimzi Kafka 4.0, Solace with pubsubplus connector 3.2.0 deployed to AKS, flink-connector-kafka 3.2.0-1.19)
    Copy code
    Sink: Data Output (1/1)#16 (b1b559975d59b20b33fddd6c6e1399a8_cbc357ccb763df2852fee8c4fc7d55f2_0_16) switched from RUNNING to FAILED with failure cause:
    org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
            at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:830) ~[flink-java-1.0.0.jar:?]
            at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:665) ~[flink-java-1.0.0.jar:?]
            at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:646) ~[flink-java-1.0.0.jar:?]
            at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:626) ~[flink-java-1.0.0.jar:?]
            at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.<init>(KafkaPartitionSplitReader.java:97) ~[flink-java-1.0.0.jar:?]
            at org.apache.flink.connector.kafka.source.KafkaSource.lambda$createReader$1(KafkaSource.java:173) ~[flink-java-1.0.0.jar:?]
            at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.createSplitFetcher(SplitFetcherManager.java:259) ~[flink-connector-files-1.19.1.jar:1.19.1]
            at org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.addSplits(SingleThreadFetcherManager.java:148) ~[flink-connector-files-1.19.1.jar:1.19.1]
            at org.apache.flink.connector.base.source.reader.SourceReaderBase.addSplits(SourceReaderBase.java:315) ~[flink-connector-files-1.19.1.jar:1.19.1]
            at org.apache.flink.streaming.api.operators.SourceOperator.handleAddSplitsEvent(SourceOperator.java:601) ~[flink-java-1.0.0.jar:?]
            at org.apache.flink.streaming.api.operators.SourceOperator.handleOperatorEvent(SourceOperator.java:570) ~[flink-java-1.0.0.jar:?]
            at org.apache.flink.streaming.runtime.tasks.OperatorEventDispatcherImpl.dispatchEventToHandlers(OperatorEventDispatcherImpl.java:72) ~[flink-java-1.0.0.jar:?]
            at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.dispatchOperatorEvent(RegularOperatorChain.java:80) ~[flink-java-1.0.0.jar:?]
            at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$dispatchOperatorEvent$22(StreamTask.java:1540) ~[flink-java-1.0.0.jar:?]
            at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-java-1.0.0.jar:?]
            at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) ~[flink-java-1.0.0.jar:?]
            at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398) ~[flink-java-1.0.0.jar:?]
            at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367) ~[flink-java-1.0.0.jar:?]
            at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352) ~[flink-java-1.0.0.jar:?]
            at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229) ~[flink-java-1.0.0.jar:?]
            at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909) ~[flink-java-1.0.0.jar:?]
            at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858) ~[flink-java-1.0.0.jar:?]
            at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) ~[flink-java-1.0.0.jar:?]
            at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) ~[flink-java-1.0.0.jar:?]
            at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) ~[flink-java-1.0.0.jar:?]
            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) ~[flink-java-1.0.0.jar:?]
            at java.lang.Thread.run(Unknown Source) ~[?:?]
    Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArrayDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer
            at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:405) ~[flink-java-1.0.0.jar:?]
            at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:436) ~[flink-java-1.0.0.jar:?]
            at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:421) ~[flink-java-1.0.0.jar:?]
            at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:709) ~[flink-java-1.0.0.jar:?]
            ... 26 more
  • j

    Jeremie Doehla

    07/08/2025, 6:29 PM
    Hello -- I'm hoping someone might be able to point out what I'm missing here. I've got a
    JdbcStatementBuilder
    that I'm working with and running into issues. If I have
    Copy code
    val insertOperation: JdbcStatementBuilder[DataTracking] = (ps: PreparedStatement, dt: DataTracking) => {
        ps.setString(1, dt.file_name)
        ps.setBoolean(2, dt.processed_file)
        ps.setString(3, dt.data_origin)
        ps.setString(4, dt.data_type)
        ps.setTimestamp(5, Timestamp.from(dt.received_time))
      }
    and use this statement everything is working correctly and processes fine. However if I update the table to add a new column and update the insert sql to also insert into this new column such that I now have this statement:
    Copy code
    val insertOperation: JdbcStatementBuilder[DataTracking] = (ps: PreparedStatement, dt: DataTracking) => {
        ps.setString(1, dt.file_name)
        ps.setBoolean(2, dt.processed_file)
        ps.setString(3, dt.data_origin)
        ps.setString(4, dt.data_type)
        ps.setObject(5, dt.data_location.orNull, Types.VARCHAR)
        ps.setTimestamp(6, Timestamp.from(dt.received_time))
      }
    this results in an error. I've also used
    Copy code
    dt.data_location match {
          case Some(value) => ps.setString(5, value)
          case None => ps.setNull(5, Types.VARCHAR)
        }
    rather than the
    setObject
    but see the same error either way. Any ideas on what to do here? I can provide the error message -- I'll place that in the thread.
    • 1
    • 2
  • m

    Mahesh Sambharam

    07/09/2025, 9:49 AM
    Hello Everyone, I am trying to build flink from source using mvn clean install -DskipTests, but i got below error:
    t
    d
    • 3
    • 21
  • p

    patricia lee

    07/09/2025, 11:16 AM
    Hi, we are migrating from Flink 1.20 to Flink 2.0. We have this .setRegisterTypes() which is already removed in Flink 2.0. Is it correct, even we use 2-3 deep class as long as they are Java classes, with primitive types and collections, it is no longer required to setRegisterPojoType(Myclass.class) in flink? Also even I try to set
    env.setRegisterPojoType(Class<?> type)
    this does not exist. Any feed back is appreciated. Thanks
    Copy code
    StreamExecutionEnvironment env = new StreamExcecutionEnvironment();
    env.setRegistrTypes(MyClass.class);
  • b

    Boopathy Raja

    07/09/2025, 6:29 PM
    Hi, I want to setup a postgres as source in the Flink. The postgres DB is available through SSH tunnelling. couldn't find any documentation related to the PSQL source with SSH tunnelling. can someone help me here?
    p
    • 2
    • 1
  • j

    Jeremy DeGroot

    07/10/2025, 7:52 PM
    Copy code
    java.lang.AbstractMethodError: Receiver class org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$$Lambda$1361/0x0000000800e04040 does not define or inherit an implementation of the resolved method 'abstract org.apache.flink.util.clock.RelativeClock getInputActivityClock()' of interface org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier$Context.
    Has anyone ever seen an error like this? I'm using the Kafka Conector at version 3.3.0-1.19 and running on the AWS Managed Flink 1.19 runtime
    • 1
    • 2
  • u

    umar farooq

    07/11/2025, 8:51 AM
    Hi All, Is there any hard rule on the service account name, that must be "flink" for flinkdeployment using the flink operator ?
  • u

    מייקי בר יעקב

    07/11/2025, 3:37 PM
    we are working only with flink in our team, and we having a lot of troubles for creating real stable flink jobs, all of our workloads restart or crash once a day or week. We did a deep research and we know its because of checkpoints or rocksdb (and mainly the conbination of the 2 of them). We are using flink 1.19 and flink 1.16 do you know if its getting solved in newer versions?
    a
    p
    • 3
    • 2
  • m

    Mahesh Sambharam

    07/11/2025, 6:37 PM
    Hi All, I am getting below error when running test in flink-cdc-pipeline-connectors for mysql as this is the part of project setup I want to run tests: at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more Caused by: org.apache.flink.util.FlinkRuntimeException: Read split MySqlSnapshotSplit{tableId=column_type_test_vq84j0.common_types, splitId='column_type_test_vq84j0.common_types:0', splitKeyType=[
    id
    DECIMAL(20, 0) NOT NULL], splitStart=null, splitEnd=null, highWatermark=null} error due to org.apache.flink.util.FlinkRuntimeException: Cannot read the binlog filename and position via 'SHOW MASTER STATUS'. Make sure your server is correctly configured. at org.apache.flink.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.checkReadException(SnapshotSplitReader.java:387) at org.apache.flink.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.pollWithBuffer(SnapshotSplitReader.java:323) at org.apache.flink.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.pollSplitRecords(SnapshotSplitReader.java:288) at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader.pollSplitRecords(MySqlSplitReader.java:122) at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:84) at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) ... 6 more Caused by: io.debezium.DebeziumException: org.apache.flink.util.FlinkRuntimeException: Cannot read the binlog filename and position via 'SHOW MASTER STATUS'. Make sure your server is correctly configured at org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.execute(MySqlSnapshotSplitReadTask.java:134) at org.apache.flink.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.snapshot(SnapshotSplitReader.java:176) at org.apache.flink.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.lambda$submitSplit$1(SnapshotSplitReader.java:161) ... 3 more Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot read the binlog filename and position via 'SHOW MASTER STATUS'. Make sure your server is correctly configured at org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.lambda$currentBinlogOffset$0(DebeziumUtils.java:136) at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:642) at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:510) at org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.currentBinlogOffset(DebeziumUtils.java:123) at org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.doExecute(MySqlSnapshotSplitReadTask.java:156) at org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.execute(MySqlSnapshotSplitReadTask.java:129) ... 5 more [INFO] [INFO] Results: [INFO] [ERROR] Errors: [ERROR] MySqlFullTypesITCase.testMysql57CommonDataTypes106 &gt;testCommonDataTypes476 » Runtime [ERROR] MySqlFullTypesITCase.testMysql57JsonDataTypes113 &gt;testJsonDataType524 » Runtime [ERROR] MySqlFullTypesITCase.testMysql57JsonDataTypesWithUseLegacyJsonFormat120 &gt;testJsonDataType524 » Runtime [ERROR] MySqlFullTypesITCase.testMysql57PrecisionTypes256 &gt;testMysqlPrecisionTypes366 » Runtime [ERROR] MySqlFullTypesITCase.testMysql57TimeDataTypes191 &gt;testTimeDataTypes583 » Runtime [INFO] [ERROR] Tests run: 10, Failures: 0, Errors: 5, Skipped: 0 [INFO] [INFO] ------------------------------------------------------------------------ [INFO] BUILD FAILURE [INFO] ------------------------------------------------------------------------ [INFO] Total time: 01:16 min [INFO] Finished at: 2025-07-11T182727Z [INFO] ------------------------------------------------------------------------ [ERROR] Failed to execute goal org.apache.maven.pluginsmaven surefire plugin3.0.0-M5:test (default-test) on project flink-cdc-pipeline-connector-mysql: There are test failures. [ERROR] [ERROR] Please refer to /mnt/d/Tech/flink-cdc/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/target/surefire-reports for the individual test results. [ERROR] Please refer to dump files (if any exist) [date].dump, [date]-jvmRun[N].dump and [date].dumpstream. [ERROR] -> [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. [ERROR] Re-run Maven using the -X switch to enable full debug logging. [ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles: [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
  • d

    dontu balu

    07/12/2025, 12:49 AM
    Hi Team, I am trying to build the flink cdc release 3.4 branch locally and getting compilation errors. Below are errors.
    Copy code
    mvn clean install -DskipTests -Pfast
    Copy code
    [INFO] Reactor Summary for flink-cdc-parent 3.4-SNAPSHOT:
    [INFO] 
    [INFO] flink-cdc-parent ................................... SUCCESS [  2.244 s]
    [INFO] flink-cdc-common ................................... SUCCESS [  3.585 s]
    [INFO] flink-cdc-pipeline-udf-examples .................... FAILURE [  1.203 s]
    [INFO] flink-cdc-pipeline-model ........................... SKIPPED
    [INFO] flink-cdc-runtime .................................. SKIPPED
    [INFO] flink-cdc-connect .................................. SKIPPED
    [INFO] flink-cdc-pipeline-connectors ...................... SKIPPED
    [INFO] flink-cdc-pipeline-connector-values ................ SKIPPED
    [INFO] flink-cdc-composer ................................. SKIPPED
    [INFO] flink-cdc-cli ...................................... SKIPPED
    [INFO] flink-cdc-dist ..................................... SKIPPED
    [INFO] flink-cdc-source-connectors ........................ SKIPPED
    [INFO] flink-connector-debezium ........................... SKIPPED
    [INFO] flink-connector-test-util .......................... SKIPPED
    [INFO] flink-cdc-base ..................................... SKIPPED
    [INFO] flink-connector-db2-cdc ............................ SKIPPED
    [INFO] flink-connector-mongodb-cdc ........................ SKIPPED
    [INFO] flink-connector-mysql-cdc .......................... SKIPPED
    [INFO] flink-connector-oceanbase-cdc ...................... SKIPPED
    [INFO] flink-connector-oracle-cdc ......................... SKIPPED
    [INFO] flink-connector-postgres-cdc ....................... SKIPPED
    [INFO] flink-connector-sqlserver-cdc ...................... SKIPPED
    [INFO] flink-connector-tidb-cdc ........................... SKIPPED
    [INFO] flink-connector-vitess-cdc ......................... SKIPPED
    [INFO] flink-sql-connector-db2-cdc ........................ SKIPPED
    [INFO] flink-sql-connector-mongodb-cdc .................... SKIPPED
    [INFO] flink-sql-connector-mysql-cdc ...................... SKIPPED
    [INFO] flink-sql-connector-oceanbase-cdc .................. SKIPPED
    [INFO] flink-sql-connector-oracle-cdc ..................... SKIPPED
    [INFO] flink-sql-connector-postgres-cdc ................... SKIPPED
    [INFO] flink-sql-connector-sqlserver-cdc .................. SKIPPED
    [INFO] flink-sql-connector-tidb-cdc ....................... SKIPPED
    [INFO] flink-sql-connector-vitess-cdc ..................... SKIPPED
    [INFO] flink-cdc-pipeline-connector-mysql ................. SKIPPED
    [INFO] flink-cdc-pipeline-connector-doris ................. SKIPPED
    [INFO] flink-cdc-pipeline-connector-starrocks ............. SKIPPED
    [INFO] flink-cdc-pipeline-connector-kafka ................. SKIPPED
    [INFO] flink-cdc-pipeline-connector-paimon ................ SKIPPED
    [INFO] flink-cdc-pipeline-connector-elasticsearch ......... SKIPPED
    [INFO] flink-cdc-pipeline-connector-oceanbase ............. SKIPPED
    [INFO] flink-cdc-pipeline-connector-maxcompute ............ SKIPPED
    [INFO] flink-cdc-pipeline-connector-iceberg ............... SKIPPED
    [INFO] flink-cdc-e2e-tests ................................ SKIPPED
    [INFO] flink-cdc-e2e-utils ................................ SKIPPED
    [INFO] flink-cdc-source-e2e-tests ......................... SKIPPED
    [INFO] flink-cdc-pipeline-e2e-tests ....................... SKIPPED
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD FAILURE
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time:  7.254 s
    [INFO] Finished at: 2025-07-11T17:44:27-07:00
    [INFO] ------------------------------------------------------------------------
    Copy code
    [INFO] Compiler bridge file: /Users/vdontu/.sbt/1.0/zinc/org.scala-sbt/org.scala-sbt-compiler-bridge_2.12-1.8.0-bin_2.12.16__68.0-1.8.0_20221110T195421.jar
    [INFO] Compiler bridge file is not installed yet
    error:
      bad constant pool index: 0 at pos: 49416
         while compiling: <no file>
            during phase: globalPhase=<no phase>, enteringPhase=<some phase>
         library version: version 2.12.16
        compiler version: version 2.12.16
      reconstructed args: -bootclasspath /Users/vdontu/.m2/repository/org/scala-lang/scala-library/2.12.16/scala-library-2.12.16.jar -classpath /Users/vdontu/.m2/repository/org/scala-sbt/util-interface/1.8.0/util-interface-1.8.0.jar:/Users/vdontu/.m2/repository/org/scala-lang/scala-compiler/2.12.16/scala-compiler-2.12.16.jar:/Users/vdontu/.m2/repository/org/scala-lang/modules/scala-xml_2.12/1.0.6/scala-xml_2.12-1.0.6.jar:/Users/vdontu/.m2/repository/jline/jline/2.14.6/jline-2.14.6.jar:/Users/vdontu/.m2/repository/org/scala-lang/scala-reflect/2.12.16/scala-reflect-2.12.16.jar:/Users/vdontu/.m2/repository/org/scala-sbt/compiler-interface/1.8.0/compiler-interface-1.8.0.jar:/Users/vdontu/.m2/repository/org/scala-lang/modules/scala-xml_2.12/1.0.6/scala-xml_2.12-1.0.6.jar:/Users/vdontu/.m2/repository/org/scala-lang/scala-reflect/2.12.16/scala-reflect-2.12.16.jar:/Users/vdontu/.m2/repository/jline/jline/2.14.6/jline-2.14.6.jar:/Users/vdontu/.m2/repository/org/scala-lang/scala-compiler/2.12.16/scala-compiler-2.12.16.jar:/Users/vdontu/.m2/repository/org/scala-lang/scala-library/2.12.16/scala-library-2.12.16.jar -d /var/folders/2n/bls6r2l513dflwnvtx4yp8k00000gq/T/scala-maven-plugin-compiler-bridge-classes12287687447114078348
    
      last tree to typer: EmptyTree
           tree position: <unknown>
                tree tpe: <notype>
                  symbol: null
               call site: <none> in <none>
    
    == Source file context for tree position ==
    
    error: scala.reflect.internal.FatalError:
      bad constant pool index: 0 at pos: 49416
         while compiling: <no file>
            during phase: globalPhase=<no phase>, enteringPhase=<some phase>
         library version: version 2.12.16
        compiler version: version 2.12.16
      reconstructed args: -bootclasspath /Users/vdontu/.m2/repository/org/scala-lang/scala-library/2.12.16/scala-library-2.12.16.jar -classpath /Users/vdontu/.m2/repository/org/scala-sbt/util-interface/1.8.0/util-interface-1.8.0.jar:/Users/vdontu/.m2/repository/org/scala-lang/scala-compiler/2.12.16/scala-compiler-2.12.16.jar:/Users/vdontu/.m2/repository/org/scala-lang/modules/scala-xml_2.12/1.0.6/scala-xml_2.12-1.0.6.jar:/Users/vdontu/.m2/repository/jline/jline/2.14.6/jline-2.14.6.jar:/Users/vdontu/.m2/repository/org/scala-lang/scala-reflect/2.12.16/scala-reflect-2.12.16.jar:/Users/vdontu/.m2/repository/org/scala-sbt/compiler-interface/1.8.0/compiler-interface-1.8.0.jar:/Users/vdontu/.m2/repository/org/scala-lang/modules/scala-xml_2.12/1.0.6/scala-xml_2.12-1.0.6.jar:/Users/vdontu/.m2/repository/org/scala-lang/scala-reflect/2.12.16/scala-reflect-2.12.16.jar:/Users/vdontu/.m2/repository/jline/jline/2.14.6/jline-2.14.6.jar:/Users/vdontu/.m2/repository/org/scala-lang/scala-compiler/2.12.16/scala-compiler-2.12.16.jar:/Users/vdontu/.m2/repository/org/scala-lang/scala-library/2.12.16/scala-library-2.12.16.jar -d /var/folders/2n/bls6r2l513dflwnvtx4yp8k00000gq/T/scala-maven-plugin-compiler-bridge-classes12287687447114078348
    
      last tree to typer: EmptyTree
           tree position: <unknown>
                tree tpe: <notype>
                  symbol: null
               call site: <none> in <none>
    
    == Source file context for tree position ==
    
    
            at scala.reflect.internal.Reporting.abort(Reporting.scala:69)
            at scala.reflect.internal.Reporting.abort$(Reporting.scala:65)
            at scala.reflect.internal.SymbolTable.abort(SymbolTable.scala:28)
            at scala.tools.nsc.symtab.classfile.ClassfileParser$ConstantPool.errorBadIndex(ClassfileParser.scala:385)
            at scala.tools.nsc.symtab.classfile.ClassfileParser$ConstantPool.getExternalName(ClassfileParser.scala:249)
            at scala.tools.nsc.symtab.classfile.ClassfileParser.readParamNames$1(ClassfileParser.scala:828)
            at scala.tools.nsc.symtab.classfile.ClassfileParser.parseAttribute$1(ClassfileParser.scala:834)
            at scala.tools.nsc.symtab.classfile.ClassfileParser.$anonfun$parseAttributes$7(ClassfileParser.scala:908)
            at scala.tools.nsc.symtab.classfile.ClassfileParser.parseAttributes(ClassfileParser.scala:908)
            at scala.tools.nsc.symtab.classfile.ClassfileParser.parseMethod(ClassfileParser.scala:611)
            at scala.tools.nsc.symtab.classfile.ClassfileParser.$anonfun$parseClass$4(ClassfileParser.scala:534)
            at scala.tools.nsc.symtab.classfile.ClassfileParser.parseClass(ClassfileParser.scala:534)
            at scala.tools.nsc.symtab.classfile.ClassfileParser.$anonfun$parse$2(ClassfileParser.scala:160)
            at scala.tools.nsc.symtab.classfile.ClassfileParser.$anonfun$parse$1(ClassfileParser.scala:146)
            at scala.tools.nsc.symtab.classfile.ClassfileParser.parse(ClassfileParser.scala:129)
            at scala.tools.nsc.symtab.SymbolLoaders$ClassfileLoader.doComplete(SymbolLoaders.scala:343)
            at scala.tools.nsc.symtab.SymbolLoaders$SymbolLoader.complete(SymbolLoaders.scala:250)
            at scala.reflect.internal.Symbols$Symbol.completeInfo(Symbols.scala:1542)
            at scala.reflect.internal.Symbols$<http://Symbol.info|Symbol.info>(Symbols.scala:1514)
            at scala.reflect.internal.Definitions.scala$reflect$internal$Definitions$$enterNewMethod(Definitions.scala:49)
            at scala.reflect.internal.Definitions$DefinitionsClass.String_$plus$lzycompute(Definitions.scala:1134)
            at scala.reflect.internal.Definitions$DefinitionsClass.String_$plus(Definitions.scala:1134)
            at scala.reflect.internal.Definitions$DefinitionsClass.syntheticCoreMethods$lzycompute(Definitions.scala:1438)
            at scala.reflect.internal.Definitions$DefinitionsClass.syntheticCoreMethods(Definitions.scala:1420)
            at scala.reflect.internal.Definitions$DefinitionsClass.symbolsNotPresentInBytecode$lzycompute(Definitions.scala:1450)
            at scala.reflect.internal.Definitions$DefinitionsClass.symbolsNotPresentInBytecode(Definitions.scala:1450)
            at scala.reflect.internal.Definitions$DefinitionsClass.init(Definitions.scala:1506)
            at scala.tools.nsc.Global$Run.<init>(Global.scala:1214)
            at scala.tools.nsc.Driver.doCompile(Driver.scala:46)
            at scala.tools.nsc.MainClass.doCompile(Main.scala:32)
            at scala.tools.nsc.Driver.process(Driver.scala:67)
            at scala.tools.nsc.Main.process(Main.scala)
            at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
            at java.base/java.lang.reflect.Method.invoke(Method.java:565)
            at sbt.internal.inc.RawCompiler.getReporter$1(RawCompiler.scala:56)
            at sbt.internal.inc.RawCompiler.apply(RawCompiler.scala:77)
            at sbt_inc.CompilerBridgeFactory.getScala2CompilerBridgeJar(CompilerBridgeFactory.java:177)
            at sbt_inc.CompilerBridgeFactory.getCompiledBridgeJar(CompilerBridgeFactory.java:60)
            at sbt_inc.SbtIncrementalCompilers.make(SbtIncrementalCompilers.java:51)
            at scala_maven.ScalaCompilerSupport.incrementalCompile(ScalaCompilerSupport.java:305)
            at scala_maven.ScalaCompilerSupport.compile(ScalaCompilerSupport.java:119)
            at scala_maven.ScalaCompilerSupport.doExecute(ScalaCompilerSupport.java:86)
    Can someone please help with this?
    t
    • 2
    • 6
  • m

    Mahesh Sambharam

    07/12/2025, 10:57 AM
    Hello Everyone, When i'm trying to run e2e tests for fluss connector, i got below error: [ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 19.388 s <<< FAILURE! - in org.apache.flink.cdc.pipeline.tests.FlussE2eITCase [ERROR] org.apache.flink.cdc.pipeline.tests.FlussE2eITCase.testMySqlToFluss Time elapsed: 0.408 s <<< ERROR! java.lang.RuntimeException: java.io.FileNotFoundException: No resource file could be found that matches the pattern fluss-sql-connector.jar. This could mean that the test module must be rebuilt via maven. at org.apache.flink.cdc.common.test.utils.TestUtils.getResource(TestUtils.java:73) at org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment.before(PipelineTestEnvironment.java:188) at org.apache.flink.cdc.pipeline.tests.FlussE2eITCase.before(FlussE2eITCase.java:133) at java.lang.reflect.Method.invoke(Method.java:498) at java.util.ArrayList.forEach(ArrayList.java:1259) at java.util.ArrayList.forEach(ArrayList.java:1259) Caused by: java.io.FileNotFoundException: No resource file could be found that matches the pattern fluss-sql-connector.jar. This could mean that the test module must be rebuilt via maven. ... 6 more [INFO] even though i tried building projects several times i facing this issue
  • g

    Ganesh

    07/13/2025, 5:05 PM
    Hey community, quick question. I'm thinking of a distributed flink cluster setup in the session model where the job manager and task manager run on dedicated VMs
  • g

    Ganesh

    07/13/2025, 5:05 PM
    Assuming I have multiple task manager VMs is it possible to isolate the VMs and while submitting a job to the job manager vm, can we choose which task manager vm to execute the job.
    p
    • 2
    • 2
  • r

    Rushikesh Gulve

    07/14/2025, 7:15 AM
    Hi everyone, I am running a pyflink application in Kubernetes. This is my config file.
    Copy code
    apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
    kind: FlinkDeployment
    metadata:
      name: data-processing-flink
      namespace: flink
    spec:
      image: <http://624083781302.dkr.ecr.ap-south-1.amazonaws.com/product/data-streaming-flink:latest|624083781302.dkr.ecr.ap-south-1.amazonaws.com/product/data-streaming-flink:latest>
      flinkVersion: "v1_20"
      serviceAccount: flink
      flinkConfiguration:
        rest.port: "8081"
        jobmanager.rpc.address: "data-processing-flink"
        jobmanager.rpc.port: "6123"
        #taskmanager.numberOfTaskSlots: "2"
    
        # High Availability
        high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
        high-availability.storageDir: file:///flink-data/ha
    
        # Checkpoints and Savepoints
        state.checkpoints.dir: file:///flink-data/checkpoints
        state.savepoints.dir: file:///flink-data/savepoints
        state.backend.type: rocksdb
    
        state.backend.rocksdb.memory.managed: "true"  # Enable Flink's memory management for RocksDB
        state.backend.rocksdb.memory.write-buffer-size: "64mb"  # Limit write buffer size
        state.backend.rocksdb.memory.max-file-size-to-compact: "2mb"  # Increase compaction file size
        state.backend.rocksdb.memory.min-files-to-compact: "3"  # Reduce compaction frequency
        state.backend.rocksdb.memory.max-files-to-compact: "10"
        state.backend.rocksdb.memory.max-output-file-size: "128mb"
        state.backend.rocksdb.memory.cache-capacity: "256mb"  # Reduce cache size
        state.backend.rocksdb.memory.managed: true
        state.backend.rocksdb.memory.write-buffer-ratio: "0.5"
        state.backend.rocksdb.memory.high-prio-pool-ratio: "0.1"
    
    
        python.fn-execution.bundle.size: "100"
        python.fn-execution.memory.managed: "true"
        python.fn-execution.buffer.memory.size: "64mb"
    
        #execution.checkpointing.alignment-timeout: "0"  # ms, after which checkpoint switches to unaligned mode dynamically
    
        #execution.checkpointing.unaligned: "true"                  # Enable unaligned checkpoints
    
        #rest.profiling.enabled: "true"
        #env.java.opts.taskmanager: "-Dorg.apache.beam.sdk.fn=DEBUG -Dorg.apache.beam.runners.fnexecution.control.FnApiControlClient=DEBUG"
    
      podTemplate:
        spec:
          imagePullSecrets:
            - name: ecrscr-credentials
          containers:
            - name: flink-main-container
              imagePullPolicy: Always
              envFrom:
                - secretRef:
                    name: redis-secret-main
              env:
                - name: _PYTHON_WORKER_MEMORY_LIMIT
                  value: "350000000"
                - name: KAFKA_BROKER
                  value: "kafka.flink.svc.cluster.local:9092"
                - name: BOOTSTRAP_SERVERS
                  value: "kafka.flink.svc.cluster.local:9092"
                - name: MINIO_SERVER
                  value: "<http://myminio-hl.flink.svc.cluster.local:9000>"
                - name: MINIO_USER
                  value: "minio"
                - name: MINIO_PASSWORD
                  value: "minio123"
                - name: MAX_PARALLELISM
                  value: "128"
              volumeMounts:
                - mountPath: /flink-data
                  name: flink-pvc
          volumes:
            - name: flink-pvc
              persistentVolumeClaim:
                claimName: flink-data-pvc
          securityContext:
            runAsUser: 9999
            runAsGroup: 9999
    
      jobManager:
        resource:
          memory: "3072m"
          cpu: 3
    
      taskManager:
        resource:
          memory: "4096m"
          cpu: 1
        replicas: 32
    
      job:
        entryClass: "org.apache.flink.client.python.PythonDriver"
        args: ["-pyclientexec", "/usr/bin/python3", "-py", "/opt/flink/usrlib/data_processing_v9/stream_processing/process_dependency/create_env.py"]
        upgradeMode: stateless
    I have disabled checkpointing purposely. The issue is I have kept parallelism as 32 and have multiple processes. Now I am seeing in all the task manager logs that the process is completed simultaneously but in kafka partitions the data is received one after another with lot of delay. My output is only a single data point sinked to kafka which is link to a s3 bucket. Why am I not seeing all the data in kafka partitions simultaneously is data is processed simultaneously. (the parallelism is utilized by producing different keys to different partitions)
    Copy code
    KafkaSink.builder()
            .set_bootstrap_servers(BOOTSTRAP_SERVERS)
            .set_record_serializer(
                KafkaRecordSerializationSchema.builder()
                .set_topic(topic)
                .set_value_serialization_schema(
                    JsonRowSerializationSchema.builder()
                    .with_type_info(type_info)
                    .build()
                )
                .build()
            )
            .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE)
            .build()
    this is my kafka sink. I am using flink 1.20 Any help is greatly appreciated. Thankyou.
    t
    • 2
    • 2
  • d

    dontu balu

    07/14/2025, 10:55 PM
    Hi team, For MYSQL CDC, I am setting the server id as
    6543
    for client replication, however when I check with process list in MYSQL server, i see the ID as
    371575
    .
    Copy code
    mysql> SHOW PROCESSLIST;
    371575 | cdcuser         | xx.xx.xx.xx:41180  | NULL   | Binlog Dump GTID |   11926 | Source has sent all binlog to replica; waiting for more updates | NULL
    How can I check all the client replication server id's used on MYSQL instance so that when I create another flink app, I can specify a unique server id which is not used.?
  • c

    Chris Harm

    07/15/2025, 4:53 PM
    Hi everyone. I am attempting to test the new 1.19.3 release to take advantage of this enhancement: • [FLINK-33977] - Adaptive scheduler may not minimize the number of TMs during downscaling However, I'm not seeing a change in behavior. After a scale down event, the number of tasks decreases, but I'm not seeing task managers getting released like they should be. The number of Available Task Slots that is much greater than taskmanager.numberOfTaskSlots. With Flink 1.19.3, I would have expected the slot assigner to assign the remaining tasks to slots on the task managers with the highest utilization, so that the under utilized task managers could be released. This should cause the number of Available Task Slots to drop below the value of numberOfTaskSlots. I'm not seeing the Task Managers being released. Is there something that I'm missing? • jobmanager.scheduler = adaptive • jobmanager.adaptive-scheduler.prefer-minimal-taskmanagers = true • taskmanager.numberOfTaskSlots = 4 • Task Managers = 54 • Total Task Slots = 216 • Available Task Slots = 55
  • l

    Leonardo Nascimento

    07/15/2025, 10:19 PM
    Hi everyone. After upgrading my cluster to Kubernetes 1.33, the
    flink-kubernetes-operator
    is unable to create the Ingress component due to an issue in fabric8 (https://github.com/fabric8io/kubernetes-client/issues/7037). I found this JIRA issue (https://issues.apache.org/jira/browse/FLINK-38093) from two days ago proposing an upgrade to Fabric8 version 7.3.1. However, this is already the version used in the main branch (https://github.com/apache/flink-kubernetes-operator/blob/main/pom.xml#L81). Is there any timeline for a new release? I couldn't find it. The stack trace from the error in
    flink-kubernetes-operator
    logs:
    Copy code
    2025-07-15 21:26:43,949 i.j.o.p.e.EventProcessor       [ERROR][datapipe-data/stream-consumer] Error during event processing ExecutionScope{ resource id: ResourceID{name='stream-consumer', namespace='datapipe-data'}, version: 2281422965}
    org.apache.flink.kubernetes.operator.exception.ReconciliationException: io.fabric8.kubernetes.client.KubernetesClientException: An error has occurred.
    	at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:167)
    	at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:63)
    	at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:153)
    	at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:111)
    	at org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
    	at io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:110)
    	at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:136)
    	at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:117)
    	at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:91)
    	at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:64)
    	at io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:452)
    	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    	at java.base/java.lang.Thread.run(Unknown Source)
    Caused by: io.fabric8.kubernetes.client.KubernetesClientException: An error has occurred.
    	at io.fabric8.kubernetes.client.KubernetesClientException.copyAsCause(KubernetesClientException.java:238)
    	at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.waitForResult(OperationSupport.java:507)
    	at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.handleResponse(OperationSupport.java:524)
    	at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.restCall(OperationSupport.java:711)
    	at io.fabric8.kubernetes.client.impl.BaseClient.getVersionInfo(BaseClient.java:323)
    	at io.fabric8.kubernetes.client.impl.KubernetesClientImpl.getKubernetesVersion(KubernetesClientImpl.java:641)
    	at org.apache.flink.kubernetes.operator.utils.IngressUtils.ingressInNetworkingV1(IngressUtils.java:272)
    	at org.apache.flink.kubernetes.operator.utils.IngressUtils.getIngress(IngressUtils.java:96)
    	at org.apache.flink.kubernetes.operator.utils.IngressUtils.updateIngressRules(IngressUtils.java:72)
    	at org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:188)
    	at org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:65)
    	at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.restoreJob(AbstractJobReconciler.java:393)
    	at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.reconcileSpecChange(AbstractJobReconciler.java:200)
    	at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:164)
    	at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:156)
    	... 13 more
    Caused by: io.fabric8.kubernetes.client.KubernetesClientException: An error has occurred.
    	at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:129)
    	at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:122)
    	at io.fabric8.kubernetes.client.utils.KubernetesSerialization.unmarshal(KubernetesSerialization.java:261)
    	at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.lambda$handleResponse$0(OperationSupport.java:551)
    	at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown Source)
    	at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
    	at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
    	at io.fabric8.kubernetes.client.http.StandardHttpClient.lambda$completeOrCancel$10(StandardHttpClient.java:142)
    	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
    	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
    	at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
    	at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
    	at io.fabric8.kubernetes.client.http.ByteArrayBodyHandler.onBodyDone(ByteArrayBodyHandler.java:51)
    	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
    	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
    	at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
    	at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
    	at io.fabric8.kubernetes.client.okhttp.OkHttpClientImpl$OkHttpAsyncBody.doConsume(OkHttpClientImpl.java:136)
    	... 3 more
    Caused by: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "emulationMajor" (class io.fabric8.kubernetes.client.VersionInfo), not marked as ignorable (9 known properties: "goVersion", "gitTreeState", "platform", "minor", "gitVersion", "gitCommit", "buildDate", "compiler", "major"])
     at [Source: (BufferedInputStream); line: 4, column: 22] (through reference chain: io.fabric8.kubernetes.client.VersionInfo["emulationMajor"])
    	at com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:61)
    	at com.fasterxml.jackson.databind.DeserializationContext.handleUnknownProperty(DeserializationContext.java:1138)
    	at com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:2224)
    	at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1709)
    	at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownVanilla(BeanDeserializerBase.java:1687)
    	at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:320)
    	at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:177)
    	at com.fasterxml.jackson.databind.deser.DefaultDeserializationContext.readRootValue(DefaultDeserializationContext.java:323)
    	at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:2105)
    	at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1481)
    	at io.fabric8.kubernetes.client.utils.KubernetesSerialization.unmarshal(KubernetesSerialization.java:257)
    	... 18 more
    ➕ 2
    a
    • 2
    • 2
  • m

    Max Schmidt

    07/16/2025, 7:08 AM
    Was anyone already successful in getting Azure Workload Identity to work for accessing Storage Accounts in a Flink Job? The flink-azure-fs-hadoop plugin only support Managed Identity, but haddop-azure itself has the Authentication Class for Workload Identity. We are not able to get it to work and just receive:
    Copy code
    java.lang.IllegalArgumentException: Failed to initialize class org.apache.hadoop.fs.azurebfs.oauth2.WorkloadIdentityTokenProvider
    Same thing for this person: https://lists.apache.org/thread/6b841lqjxryxqyv4737bo46y41z6zgct
  • a

    Avinesh Sachan

    07/16/2025, 11:30 AM
    For Flink job(created using SQL), This is evident with metrics from processing operators that the operator names used in these metrics are auto-generated by Flink and do not appear to be configurable. Please suggest whether there is an existing solution or recommended approach in Flink to configure operator name(specially for processing operator) For Flink job(created using SQL) if there are any items in the Flink backlog to support to configure operator name etc. for SQL-based jobs?
    j
    • 2
    • 1
  • r

    Ravisangar Vijayan

    07/17/2025, 6:00 AM
    Hello team, we are trying to migrate flink 1.20 to Flink 2.0.. we have a use case to read data from S3 bucket and seeing an issue after migrating Flink 2.0.. Didn't face this issue with 1.20 version. Could you help
    Caused by: java.util.concurrent.ExecutionException: java.util.concurrent.TimeoutException: Invocation of [LocalRpcInvocation(RestfulGateway.requestJobStatus(JobID, Duration))] at recipient [<pekko.tcp://flink@gvhvhgvhgcggh>] timed out. This is usually caused by: 1) Pekko failed sending the message silently, due to problems like oversized payload or serialization failures. In that case, you should find detailed error information in the logs. 2) The recipient needs more time for responding, due to problems like slow machines or network jitters. In that case, you can try to increase pekko.ask.timeout.
  • f

    Fabrizzio Chavez

    07/17/2025, 8:33 AM
    Hello, is it posible to configure the flink SQL gateway with the flink kubernetes operator? How to configure it for real production scenarios?
    p
    • 2
    • 1
  • j

    Jan Kevin Dick

    07/17/2025, 1:07 PM
    Hello everyone, I have a short question regarding the Prometheus Metrics exported by the Flink Taskmanager. I have noticed that there are some Prometheus summary's been exported but they are not complete. Normally the consists of a metric per quantiles and the _sum and _count metric. The ones in the Taskmanager only have the quantiles ones and the _count. This causes for example my NewRelic Prometheus Scraper to crash because he is expecting the _sum and therefore gets a nil pointer exception. I'm currently running flink in Version 1.17.
  • a

    Amit Peshwani

    07/17/2025, 5:01 PM
    Hello team, we are using Flink (1.19.1) with AWS KDA and using Kinesis Firehose as a sink. Following is the setup:
    Copy code
    public class FirehoseSinkFactory {
        public static KinesisFirehoseSink<OutputRecord> createFirehoseSink(final Properties firehoseSinkConfigProperties) {
            final String DELIVERY_STREAM_NAME = "deliveryStreamName";
    
            return KinesisFirehoseSink.<OutputRecord>builder()
                    .setFirehoseClientProperties(firehoseSinkConfigProperties)
                    .setSerializationSchema(new FirehoseSerializationSchema())
                    .setDeliveryStreamName(firehoseSinkConfigProperties.getProperty(DELIVERY_STREAM_NAME))
                    .build();
        }
    }
    Could someone guide/help on how to make sure connection is not timeout while using it?
    Copy code
    property_group_id = "FirehoseSinkConfigProperties"
          property_map = {
            "aws.region"         = "aws-region-name"
            "deliveryStreamName" = "stream-name"
          }
    j
    • 2
    • 2
  • p

    Pragya Agarwal

    07/17/2025, 5:21 PM
    Hello, I noticed that both our release-2.0 branch and master branch have flaky CI tests. If you go search the commit history, you will see a lot of CI builds failing with random unrelated tests and VM crashes. Does anyone know if we have a path forward for making the newer CI builds more stable?
  • j

    jq l

    07/18/2025, 6:29 AM
    Hello,I have a question that has been bothering me for 3 days, and I still haven't managed to solve it.😭😭😭 Flink version: 1.2, flinkcdc: 3.2.1, Oracle: 19c (CDB+PDB mode). When I use the following statement
    Copy code
    FLINK SQL/CREATE TABLE oracle_prod_testQWE(
      COLUMN1 BIGINT,
      COLUMN2 STRING,
      PRIMARY KEY (COLUMN1) NOT ENFORCED
    ) WITH (
      'connector' = 'oracle-cdc',
      'hostname' = '<http://10.xxx|10.xxx>',
      'port' = '1521',
      'username' = 'c##xxx',
      'password' = 'xxx',
      'database-name' = 'orclCdb', 
      'schema-name' = 'APP_USER',
      'table-name' = 'TABLEZXC',
      'debezium.database.pdb.name' = 'db1cdb',
      'debezium.log.mining.strategy' = 'online_catalog',
      'debezium.snapshot.mode' = 'initial'
       );
       
    FLINK SQL/select * from oracle_prod_testQWE;
    Copy code
    My Flink job always reports this error:
    
    xxxxxxxxxx Caused by: java.io.IOException: io.debezium.DebeziumException: The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot.    at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:101)    at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)    ... 6 moreCaused by: io.debezium.DebeziumException: The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot.    at io.debezium.relational.HistorizedRelationalDatabaseSchema.recover(HistorizedRelationalDatabaseSchema.java:59)    at org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleSourceFetchTaskContext.validateAndLoadDatabaseHistory(OracleSourceFetchTaskContext.java:275)    at org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleSourceFetchTaskContext.configure(OracleSourceFetchTaskContext.java:118)    at org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.submitTask(IncrementalSourceStreamFetcher.java:84)    at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.submitStreamSplit(IncrementalSourceSplitReader.java:261)    at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.pollSplitRecords(IncrementalSourceSplitReader.java:153)    at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:98)
    Sure, I'd be happy to help! Could you please provide more details about the error message? What specific error are you encountering, and what steps have you taken so far to troubleshoot the issue? The more information you can give me, the better I can assist you.😭😭😭