https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • a

    Adesh DSilva

    04/18/2025, 10:28 AM
    Hi I am looking at how I can solve a problem using Flink.. I have two topics in Kafka P1 and P2. I want Flink to consume from both P1 and P2 but backpressure and pause consuming from P2 while P1 has messages. When P1 messages are over P2 can resume and if P1 never gets completed I dont mind if Kafka drops old messages in P2. I could probably do it using custom source function in Kafka but I dont like the idea of not using native Flink leading to edge cases during parallel processing of partitions. What is the best way to do priority based message consumption in Flink?
    p
    • 2
    • 2
  • u

    מייקי בר יעקב

    04/19/2025, 4:59 PM
    Lets say that I have a POJO of house (id, street etc), And i have a file sink of that pojo, there is a way to configure the name of each file to be {house.getId()}.json? Thanks!
    p
    p
    • 3
    • 3
  • l

    Lee xu

    04/21/2025, 7:53 AM
    Hi! I am using Flink version 1.17 and batch mode to write ORC files through the file system connector. How can I control the size of the written ORC files? I found that only one ORC file is generated, and as the amount of data on the source side increases, the written file becomes larger, resulting in a Java heap memory overflow exception。
    p
    • 2
    • 2
  • s

    Soumya Ghosh

    04/21/2025, 8:32 AM
    Hello, I am using Flink 1.16 and trying to use JSON_QUERY to extract data from a payload.
    Copy code
    Json document as string - '[{"key":"1","value":1},{"key":"2","value":2},{"key":"3","value":3}]'
    Copy code
    JSON_QUERY(<list_of_objects>, 'lax $[?(@.key=="2")]')
    -- getting output '[{"key":"2","value":2}]'
    Now I want extract an object from the json list of objects dynamically, say based on column value of Flink table.
    Copy code
    JSON_QUERY(<list_of_objects>, CONCAT('lax $[?(@.key=="', col1 ,'")]')
    -- CodeGenException: Unsupported call: JSON_QUERY(STRING, STRING, SYMBOL NOT NULL, SYMBOL NOT NULL, SYMBOL NOT NULL) If you think this function should be supported, you can create an issue and start a discussion for it.
    Also tried to with
    ||
    concat operator, same error Any way to circumvent this error?
  • m

    Michael Johnson

    04/21/2025, 3:22 PM
    Hi All! I am doing a bit of research and wanted to double check something: • Github repo for Flink-Elasticsearch-Connector has support for ES8: https://github.com/apache/flink-connector-elasticsearch/tree/main/flink-connector-elasticsearch8 • Looks like the documentation only has ES6 and ES7 though: https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/elasticsearch/#elasticsearch-connector Is ES8 support still a Work in Progress or is it ready for use and just not documented yet?
  • a

    Akash Patel

    04/23/2025, 6:04 AM
    Hi All, I’m running a Flink Session Mode cluster using the FlinkDeployment CRD and submitting jobs via the REST API. TaskManager pods are created dynamically as jobs are submitted. The issue is that we have over 100 jobs, and when the JobManager pod is restarted, it takes around 20 minutes to recover all the jobs due to the sequential nature of recovery where it start one TM pod at a time and fill all the available slots on that tm pod Is there a better way to use a Flink Session cluster that allows for faster recovery after a JobManager pod restart? Thank You
  • a

    Abhishek Joshi

    04/23/2025, 1:31 PM
    Hi all, I’m testing exactly-once semantics in a Flink job with Kafka as both the source and sink. While most of the setup works as expected, I’m noticing a consistent data loss when I forcefully kill all TaskManagers during processing or any failure happened. Test Scenario: Input: 3 million records pushed to Kafka source topic. During processing: All TaskManagers were deleted to simulate failure. Validation: Used a Kafka consumer with isolation.level=read_committed to read from the sink topic. Observed: Input records: 2,999,981 Output records: 2,999,675 Missing: 306 records Given that I'm using exactly-once with checkpoints and transactional sink, I expected Flink to recover and replay all uncommitted data. But it seems like some data is getting lost consistently in this failure scenario. Configurations Used:
    Copy code
    //Kafka Source:
    .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST));
    source.kafka.enable.auto.commit = false;
    
    //Kafka Sink:
    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE);
    .setTransactionalIdPrefix("event-flow-trigger-mapping");
    sink.kafka.isolation.level = read_committed;
    sink.kafka.enable.idempotence = true;
    <http://sink.kafka.transaction.timeout.ms|sink.kafka.transaction.timeout.ms> = 900000;
    
    //Checkpointing:
    <http://app.checkpoint.interval.ms|app.checkpoint.interval.ms> = 120000;
    app.checkpoint.min-pause = 60000;
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    env.getCheckpointConfig().setCheckpointStorage(path);
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    env.getCheckpointConfig().setExternalizedCheckpointCleanup(
        ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
    );
    Has anyone experienced this kind of data loss even with exactly-once semantics enabled? Are there any known edge cases where this might happen during abrupt TaskManager shutdowns? Any pointers or insights would be really helpful. Thanks! cc: @Dheeraj Panangat
    p
    a
    d
    • 4
    • 28
  • t

    Tudor Plugaru

    04/23/2025, 2:42 PM
    Hey team! Currently, I am looking into optimizing for low latency a flink job. The job is very simple, reads from a topic with 4 partitions, does some mapping and sinks to another topic with 4 partitions. Nothing fancy... currently, I tweaked these configs on the Consumer side:
    Copy code
    max.poll.records=100
    fetch.min.bytes=1
    <http://fetch.max.wait.ms|fetch.max.wait.ms>=5
    fetch.max.bytes=8388608
    max.partition.fetch.bytes=2097152
    and on the producer side:
    Copy code
    batch.size=16384
    max.request.size=2097152
    buffer.memory=33554432
    <http://linger.ms|linger.ms>=0
    Job parallelism is 4 and I already have 1 consumer per partition. So far, this worked it I managed to drop the latency, but looking at the consumer lag, there are still records in the topic... My goal would be to have as few as possible 😅 Any advice is very much welcomed! Thanks
    p
    • 2
    • 13
  • b

    Bentzi Mor

    04/23/2025, 6:16 PM
    Hey team we are using proxysql to connect to our mysql servers I created cdc yaml file from mysql to starrocks and use:
    Copy code
    source:
      type: mysql
      hostname: proxysql-server
      port: 6033
      username: bentzi.m
      password: XXX
      tables: manager.orders
      server-id: YYY
      server-time-zone: UTC
      debezium.database.ssl.mode: REQUIRED
    got error:
    Copy code
    now I get error:
    Cannot find any table by the option 'tables' = manager.orders
    • 1
    • 1
  • m

    Maxime Menard

    04/23/2025, 9:06 PM
    Hi All, I'm trying to setup authentication for Hadoop’s ABFS Azure Filesystem (https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/filesystems/azure/#abfs) Unfortunately, I don't find an example to store secrets in a safe way, as I don't want to put properties like
    fs.azure.account.oauth2.client.secret
    directly under
    flinkConfiguration
    , which generates the ConfigMap related to
    flink-conf.yaml
    . I thought to do this in the java code, but it seems the configuration is not passed to the task manager, but maybe I am doing something wrong ?
    Copy code
    Configuration configuration = new Configuration();
    configuration.setString("fs.azure.account.auth.type", "OAuth");
    configuration.setString("fs.azure.account.oauth.provider.type","org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider");
    configuration.setString("fs.azure.account.oauth2.client.id", ...);
    configuration.setString("fs.azure.account.oauth2.client.secret", ...);
    configuration.setString("fs.azure.account.oauth2.client.endpoint", ...);
    FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
    env.configure(configuration);
    Thanks for your help !
  • t

    Tamir Sagi

    04/24/2025, 7:24 AM
    Hi all, we are running our clusters(1.19.1) on AWS EKS(Application mode) , the states are stored on S3. The clusters are managed by Flink Kubernetes operator (1.11.0). In addition, we are running Istio within the cluster, we had to exclude Pekko & JM RPC ports from being redirected to Envoy using the following annotation [1] [2]
    Copy code
    "traffic.sidecar.istio.io/excludeOutboundPorts"
    everything was working fine and very stable, yet once Istio was updated from 1.22.x to 1.24.x. the communication between TM and JM terminated with following warning
    [Warning] [{pekkoAddress=<pekko.tcp://flink@10.233.24.57:6123>, pekkoSource=<pekko.tcp://flink@10.233.24.57:6123/system/endpointManager/reliableEndpointWriter-pekko.tcp%3A%2F%2Fflink%4010.233.90.141%3A6123-0>, pekkoTimestamp=10:56:31.006UTC, pekkoUid=6633221187301835071, sourceActorSystem=flink, sourceThread=flink-pekko.remote.default-remote-dispatcher-6}] [o.a.p.r.ReliableDeliverySupervisor]: Association with remote system [<pekko.tcp://flink@10.233.90.141:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<pekko.tcp://flink@10.233.90.141:6123>]] Caused by: [The remote system explicitly disassociated (reason unknown).
    Once we restarted the Flink deployment the following warnings appeared
    [Warning] [{}] [o.a.f.r.r.a.ActiveResourceManager]: Discard registration from TaskExecutor dev-p2a2-taskmanager-1-1 at (<pekko.tcp://flink@10.233.83.38:6122/user/rpc/taskmanager_0>) because the framework did not recognize it
    then
    [Warning] [{}] [o.a.f.r.d.StandaloneDispatcher]: Ignoring JobGraph submission 'dev-p2a2' (5443fa441f7b39f55a48528d5665f17a) because the job already reached a globally-terminal state (i.e. FAILED, CANCELED, FINISHED) in a previous execution.
    we deleted the state folder on S3 it did not work. the cluster config map which holds the last checkpoint(managed by the k8s-operator) was not there either. The only solution was to re-create the cluster (obviously we cannot do that in production ). I read there were many breaking changes between Istio 1.22.x and 1.24.x, yet after recreating the cluster it worked fine. any suggestions on why the cluster reached global terminate state and could not recover? [1] https://doc.akka.io/libraries/akka-management/current/bootstrap/istio.html#istio [2] https://pekko.apache.org/docs/pekko/1.0/project/migration-guides.html#migration-to-apache-pekko
    👍 1
  • b

    Bentzi Mor

    04/24/2025, 3:07 PM
    I am using flink-cdc-3.3.0 with flink-2.0.0 set yaml from mysql to starrocks I am getting the error:
    Copy code
    Exception in thread "main" java.lang.NoSuchFieldError: Class org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator does not have member field 'org.apache.flink.streaming.api.operators.ChainingStrategy chainingStrategy'
    	at org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.<init>(SchemaOperator.java:114)
    	at org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperatorFactory.<init>(SchemaOperatorFactory.java:52)
    	at org.apache.flink.cdc.composer.flink.translator.SchemaOperatorTranslator.addRegularSchemaOperator(SchemaOperatorTranslator.java:94)
    	at org.apache.flink.cdc.composer.flink.translator.SchemaOperatorTranslator.translateRegular(SchemaOperatorTranslator.java:62)
    	at org.apache.flink.cdc.composer.flink.FlinkPipelineComposer.translate(FlinkPipelineComposer.java:186)
    	at org.apache.flink.cdc.composer.flink.FlinkPipelineComposer.compose(FlinkPipelineComposer.java:99)
    	at org.apache.flink.cdc.cli.CliExecutor.run(CliExecutor.java:89)
    	at org.apache.flink.cdc.cli.CliFrontend.main(CliFrontend.java:74)
    f
    • 2
    • 2
  • r

    rmoff

    04/24/2025, 5:20 PM
    Using the Apache Flink web UI I can see the current watermark for a job. Does anyone know if there is another user accessible API way of getting this for a job in progress?
    CURRENT_WATERMARK
    in SQL isn't helping me. I found the
    <http://localhost:8081/jobs/[…]/vertices/[…]/watermarks>
    REST API but wondered if there was anything else
  • m

    Mike Ashley

    04/24/2025, 8:34 PM
    Having some issue with getting no task executors when running locally. Have not been able to find anything in logs to hint at what the problem is. Any ideas of where I should be looking ?
    p
    • 2
    • 2
  • u

    מייקי בר יעקב

    04/24/2025, 8:48 PM
    I really need your help, I have a flink task that when he finish i get RemoteTransportException, do you know how to solve it? Thanks
    p
    • 2
    • 2
  • j

    Jeff McFarlane

    04/24/2025, 8:51 PM
    Hi all, We are working with an
    upsert-kafka
    table whose message value is split into two nested json sections. Similar to this example:
    Copy code
    {
      "header": {
        "event_time": 1745499600000,
        "source": "web-shop"
      },
      "main": {
        "order_id": "o456",
        "user_id": "u123",
        "amount": 100.5
      }
    }
    Is it possible to define a primary key for the upsert-kafka connector when your key is in nested json? The only way I have been able to get it to work is by "pre-flattening" the json so the key fields are top-level. Is that an actual requirement? In this example we might do something like:
    Copy code
    CREATE TABLE orders (
      header ROW<event_time BIGINT, source STRING>,
      main   ROW<user_id STRING, order_id STRING, amount DECIMAL(18, 4)>,
      PRIMARY KEY (main.order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'upsert-kafka',
      'topic' = 'orders',
      'key.format' = 'json',
      'value.format' = 'json'
    );
    Flink will complain:
    Copy code
    SQL parse failed. Encountered "." at line X. Was expecting one of: ")", ",".
    If you alias the nested field such as:
    Copy code
    main.order_id AS order_id
    PRIMARY KEY (order_id) NOT ENFORCED
    Flink will throw the error:
    Copy code
    A PRIMARY KEY constraint must be declared on physical columns.
    Has anyone found a way to reference a nested field in the PK without flattening? (or can we use the values in the message's key) Thanks!
  • a

    Avinash Tripathy

    04/25/2025, 10:29 AM
    Hi Team, we are getting below error after upgrading to flink 1.20.1 version.
    Copy code
    2025-04-25 10:13:06,041 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: MULTISINK_APP_KAFKA_SOURCE -> MULTISINK_APP_TRANSFORMER -> MLTISINK_APP_KAFKA_SINK: Writer -> MLTISINK_APP_KAFKA_SINK: Committer (2/2)#0 (57700502b5f830802eee110ba116985e_cbc357ccb763df2852fee8c4fc7d55f2_1_0) switched from RUNNING to FAILED with failure cause:
    java.lang.NoSuchMethodError: 'org.apache.avro.io.BinaryEncoder com.amazonaws.services.schemaregistry.flink.avro.GlueSchemaRegistryAvroSerializationSchema.getEncoder()'
            at com.amazonaws.services.schemaregistry.flink.avro.GlueSchemaRegistryAvroSerializationSchema.serialize(GlueSchemaRegistryAvroSerializationSchema.java:106) ~[blob_p-88b7106f6d8297375e9582fd78c865e1809df7f8-606393c8b81f3b4642913f94ef1a3dd4:?]
            at com.foo.bar.app.models.KafkaProducerRecordSchema.serialize(KafkaProducerRecordSchema.java:103) ~[blob_p-88b7106f6d8297375e9582fd78c865e1809df7f8-606393c8b81f3b4642913f94ef1a3dd4:?]
            at com.foo.bar.app.models.KafkaProducerRecordSchema.serialize(KafkaProducerRecordSchema.java:27) ~[blob_p-88b7106f6d8297375e9582fd78c865e1809df7f8-606393c8b81f3b4642913f94ef1a3dd4:?]
            at org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:196) ~[blob_p-88b7106f6d8297375e9582fd78c865e1809df7f8-606393c8b81f3b4642913f94ef1a3dd4:?]
            at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:205) ~[flink-dist-1.20.1.jar:1.20.1]
            at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ~[flink-dist-1.20.1.jar:1.20.1]
            at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) ~[flink-dist-1.20.1.jar:1.20.1]
            at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.20.1.jar:1.20.1]
            at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:52) ~[flink-dist-1.20.1.jar:1.20.1]
            at com.foo.bar.app.tranformer.FlinkTransformerFunction.flatMap(FlinkTransformerFunction.java:102) ~[blob_p-88b7106f6d8297375e9582fd78c865e1809df7f8-606393c8b81f3b4642913f94ef1a3dd4:?]
            at com.foo.bar.app.tranformer.FlinkTransformerFunction.flatMap(FlinkTransformerFunction.java:32) ~[blob_p-88b7106f6d8297375e9582fd78c865e1809df7f8-606393c8b81f3b4642913f94ef1a3dd4:?]
            at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47) ~[flink-dist-1.20.1.jar:1.20.1]
            at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ~[flink-dist-1.20.1.jar:1.20.1]
            at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) ~[flink-dist-1.20.1.jar:1.20.1]
            at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.20.1.jar:1.20.1]
            at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:310) ~[flink-dist-1.20.1.jar:1.20.1]
  • a

    Avinash Tripathy

    04/25/2025, 10:31 AM
    we do have below maven dependency
    Copy code
    <flink.version>1.20.1</flink.version>
    <mockito.version>3.0.0</mockito.version>
    <log4j.version>2.19.0</log4j.version>
    <awssdk.version>2.31.25</awssdk.version>
    <spring-test.version>7.0.0-M4</spring-test.version>
    <amazon-glue.version>1.1.23</amazon-glue.version>
    <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-base</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka</artifactId>
                <version>3.0.1-1.18</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <!-- <https://mvnrepository.com/artifact/org.junit.jupiter/junit-jupiter-api> -->
            <dependency>
                <groupId>org.junit.jupiter</groupId>
                <artifactId>junit-jupiter-api</artifactId>
                <version>5.10.2</version>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-test-utils</artifactId>
                <version>${flink.version}</version>
                <scope>test</scope>
                <exclusions>
                    <exclusion>
                        <groupId>org.apache.avro</groupId>
                        <artifactId>avro</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.apache.zookeeper</groupId>
                        <artifactId>zookeeper</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-runtime</artifactId>
                <version>${flink.version}</version>
                <scope>test</scope>
                <classifier>tests</classifier>
            </dependency>
            <dependency>
                <groupId>software.amazon.msk</groupId>
                <artifactId>aws-msk-iam-auth</artifactId>
                <version>2.0.3</version>
            </dependency>
            <dependency>
                <groupId>software.amazon.awssdk</groupId>
                <artifactId>sts</artifactId>
                <version>${awssdk.version}</version>
            </dependency>
            <dependency>
                <groupId>software.amazon.awssdk</groupId>
                <artifactId>iam</artifactId>
                <version>${awssdk.version}</version> <!-- Make sure to use the latest version -->
            </dependency>
            <dependency>
                <groupId>software.amazon.awssdk</groupId>
                <artifactId>secretsmanager</artifactId>
                <version>${awssdk.version}</version>
            </dependency>
            <dependency>
                <groupId>software.amazon.awssdk</groupId>
                <artifactId>auth</artifactId>
                <version>${awssdk.version}</version>
            </dependency>
            <dependency>
                <groupId>software.amazon.awssdk</groupId>
                <artifactId>core</artifactId>
                <version>${awssdk.version}</version>
                <type>pom</type>
            </dependency>
            <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
                <version>3.5.0</version> <!-- Or any newer version -->
            </dependency>
            <!-- Flink redis connector-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-redis_2.10</artifactId>
                <version>1.1.5</version>
                <exclusions>
                    <exclusion>
                        <groupId>redis.clients</groupId>
                        <artifactId>jedis</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>log4j</groupId>
                        <artifactId>log4j</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-1.2-api</artifactId>
                <version>2.21.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-core</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-metrics-prometheus</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-metrics-dropwizard</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>com.codahale.metrics</groupId>
                <artifactId>metrics-core</artifactId>
                <version>3.0.2</version>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.26</version>
                <scope>provided</scope>
            </dependency>
            <!-- <https://mvnrepository.com/artifact/org.junit.vintage/junit-vintage-engine> -->
            <dependency>
                <groupId>org.junit.vintage</groupId>
                <artifactId>junit-vintage-engine</artifactId>
                <version>5.10.2</version>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-slf4j-impl</artifactId>
                <version>${log4j.version}</version>
                <scope>runtime</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-api</artifactId>
                <version>${log4j.version}</version>
                <scope>runtime</scope>
            </dependency>
            <dependency>
                <groupId>org.junit.jupiter</groupId>
                <artifactId>junit-jupiter-engine</artifactId>
                <version>5.10.2</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.junit.vintage</groupId>
                        <artifactId>junit-vintage-engine</artifactId>
                    </exclusion>
                </exclusions>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-core</artifactId>
                <version>${log4j.version}</version>
                <scope>runtime</scope>
            </dependency>
            <dependency>
                <groupId>org.testcontainers</groupId>
                <artifactId>kafka</artifactId>
                <version>1.15.3</version>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.mockito</groupId>
                <artifactId>mockito-core</artifactId>
                <version>${mockito.version}</version>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.mockito</groupId>
                <artifactId>mockito-junit-jupiter</artifactId>
                <version>${mockito.version}</version>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java</artifactId>
                <version>${flink.version}</version>
                <scope>test</scope>
                <classifier>tests</classifier>
            </dependency>
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-text</artifactId>
                <version>1.10.0</version>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.dataformat</groupId>
                <artifactId>jackson-dataformat-yaml</artifactId>
                <version>2.15.0-rc2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-avro-confluent-registry</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-test</artifactId>
                <version>${spring-test.version}</version>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>ca.uhn.hapi</groupId>
                <artifactId>hapi-base</artifactId>
                <version>2.3</version>
            </dependency>
            <dependency>
                <groupId>software.amazon.glue</groupId>
                <artifactId>schema-registry-flink-serde</artifactId>
                <version>${amazon-glue.version}</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.apache.avro</groupId>
                        <artifactId>avro</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.json</groupId>
                        <artifactId>json</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>com.google.protobuf</groupId>
                        <artifactId>protobuf-java</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-avro</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>com.squareup.wire</groupId>
                        <artifactId>wire-runtime-jvm</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>com.squareup.wire</groupId>
                <artifactId>wire-runtime-jvm</artifactId>
                <version>5.2.0</version>
            </dependency>
            <dependency>
                <groupId>com.google.protobuf</groupId>
                <artifactId>protobuf-java</artifactId>
                <version>4.28.2</version>
            </dependency>
            <dependency>
                <groupId>org.json</groupId>
                <artifactId>json</artifactId>
                <version>20231013</version>
            </dependency>
            <dependency>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
                <version>32.1.2-jre</version>
            </dependency>
            <dependency>
                <groupId>software.amazon.glue</groupId>
                <artifactId>schema-registry-build-tools</artifactId>
                <version>1.1.16</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>7.2.2-ccs</version>
                <scope>compile</scope>
            </dependency>
        </dependencies>
  • p

    Pham Khoi

    04/25/2025, 11:57 AM
    hi guys, I have a big problem about flink on kubernetes when write to hdfs with kerberos. First of all, I have prepared a file krb5.conf and keytab. However, about jaas.conf I don't know how to apply it for kerberos authentication JAAS module, and the result is I got some error such as: "GSS initiate failed, Failed to find any Kerberos tgt". More detail, I am following the yaml of a guy from stackoverflow like:
    Copy code
    apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
    kind: FlinkDeployment
    ...
    spec:
      image: ...
      flinkVersion: v1_15
      flinkConfiguration:
        security.kerberos.krb5-conf.path: /etc/krb5.conf
        security.kerberos.login.keytab: /opt/keytab
        security.kerberos.login.principal: ...
        security.kerberos.login.contexts: KafkaClient
      ...
      podTemplate:
        spec:
          containers:
          - name: flink-main-container
            volumeMounts:
            - name: kerberos-keytab
              mountPath: /opt/keytab
              subPath: keytab
    ...
    I don't how to apply KafkaClient to contexts. If u wanna know more detail: see in the link: https://stackoverflow.com/questions/73581288/specifying-keytab-for-flinkdeployment-of-flink-kubernetes-operator. Help me please, I am so approciate that! love. OH I forgot a another point that when I use hadoop common with the version higher than 3.2.0 I will get error: Client can not authenticate via ( token, kerberos), and if I change the version to 2.10.2 I got the error as I described : GSS initiate failed, Failed to find any Kerberos tgt
    👀 2
  • t

    Tiansu Yu

    04/25/2025, 12:00 PM
    hi community, About flink job configurations, how do i know which config params are scoped on cluster level, what are on pipeline / job level? https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/config/
    • 1
    • 1
  • f

    Francis Altomare

    04/25/2025, 2:01 PM
    Hey all 👋, I'm diving into Flink 2 and have a question about using the new async state store APIs. From what I understand in order to use
    enableAsyncState()
    with a
    KeyedProcessFunction
    I would need to configure my job with something along the lines of:
    Copy code
    upstream
                .keyBy { keyFunction }
                .enableAsyncState()
                .transform(
                    "KeyedProcess",
                    TypeInformation.of(KeyProcessOutput::class.java),
                    AsyncKeyedProcessOperator(KeyedProcessFunction())
                )
                .sinkTo(downstream)
    This approach makes sense to me when dealing with `KeyedProcessFunction`'s. However, I have a pipeline that uses a
    KeyedCoProcessFunction
    with a state store that could really benefit from
    enableAsyncState
    . Does anyone know of any analog to the
    AsyncKeyedProcessOperator
    that would work with a
    KeyedCoProcessFunction
    ? Is this something that would be possible or should I be looking for a different approach here? Thanks a lot!
  • f

    Frantisek Hartman

    04/25/2025, 2:12 PM
    Hi, I have a data skew issue with kinesis source The uniform shard assigner assigns the shards uniformly across subtasks, most of the time it does a good job and I get subtasks with either N or N+1 shards assigned (e.g. 10 shards, 3 subtasks I get 3,3,4 shards per subtask Now the problem is that the subtasks are distributed to task managers randomly. Sometimes I get even distribution, but sometimes one task manager gets all the subtasks with N+1 shards and other task manager gets all the tasks with N subtasks I tried to implement my own KinesisShardAssigner that takes into account the location in the ReaderInfo This works as I want it to, but only when the job starts fresh, when the job is restored from a checkpoint, it doesn't re-assign the shards, but restores them to the subtasks as they were in the last job
  • g

    Guruguha Marur Sreenivasa

    04/25/2025, 6:21 PM
    Having issues with a Flink session cluster deployment which used to work seamlessly before on another EKS cluster. I get this error which I am unable to decode what it is pointing at. Below is the stacktrace:
    Copy code
    2025-04-25 18:12:20,226 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting KubernetesSessionClusterEntrypoint down with application status FAILED. Diagnostics java.lang.SecurityException: java.io.IOException: Configuration Error:                                                                                                                     │
    │     Line 4: expected [option key]                                                                                                                                                                                                                                                                                                                                                       │
    │     at java.base/sun.security.provider.ConfigFile$Spi.<init>(Unknown Source)                                                                                                                                                                                                                                                                                                            │
    │     at java.base/sun.security.provider.ConfigFile.<init>(Unknown Source)                                                                                                                                                                                                                                                                                                                │
    │     at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)                                                                                                                                                                                                                                                                                         │
    │     at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)                                                                                                                                                                                                                                                                                         │
    │     at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)                                                                                                                                                                                                                                                                                     │
    │     at java.base/java.lang.reflect.Constructor.newInstance(Unknown Source)                                                                                                                                                                                                                                                                                                              │
    │     at java.base/java.lang.Class.newInstance(Unknown Source)                                                                                                                                                                                                                                                                                                                            │
    │     at java.base/javax.security.auth.login.Configuration$2.run(Unknown Source)                                                                                                                                                                                                                                                                                                          │
    │     at java.base/javax.security.auth.login.Configuration$2.run(Unknown Source)                                                                                                                                                                                                                                                                                                          │
    │     at java.base/java.security.AccessController.doPrivileged(Native Method)                                                                                                                                                                                                                                                                                                             │
    │     at java.base/javax.security.auth.login.Configuration.getConfiguration(Unknown Source)                                                                                                                                                                                                                                                                                               │
    │     at org.apache.flink.runtime.security.modules.JaasModule.install(JaasModule.java:98)                                                                                                                                                                                                                                                                                                 │
    │     at org.apache.flink.runtime.security.SecurityUtils.installModules(SecurityUtils.java:76)                                                                                                                                                                                                                                                                                            │
    │     at org.apache.flink.runtime.security.SecurityUtils.install(SecurityUtils.java:57)                                                                                                                                                                                                                                                                                                   │
    │     at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.installSecurityContext(ClusterEntrypoint.java:279)                                                                                                                                                                                                                                                                         │
    │     at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:231)                                                                                                                                                                                                                                                                                   │
    │     at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:739)                                                                                                                                                                                                                                                                           │
    │     at org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint.main(KubernetesSessionClusterEntrypoint.java:61)                                                                                                                                                                                                                                                       │
    │ Caused by: java.io.IOException: Configuration Error:                                                                                                                                                                                                                                                                                                                                    │
    │     Line 4: expected [option key]                                                                                                                                                                                                                                                                                                                                                       │
    │     at java.base/sun.security.provider.ConfigFile$Spi.ioException(Unknown Source)                                                                                                                                                                                                                                                                                                       │
    │     at java.base/sun.security.provider.ConfigFile$Spi.match(Unknown Source)                                                                                                                                                                                                                                                                                                             │
    │     at java.base/sun.security.provider.ConfigFile$Spi.parseLoginEntry(Unknown Source)                                                                                                                                                                                                                                                                                                   │
    │     at java.base/sun.security.provider.ConfigFile$Spi.readConfig(Unknown Source)                                                                                                                                                                                                                                                                                                        │
    │     at java.base/sun.security.provider.ConfigFile$Spi.init(Unknown Source)                                                                                                                                                                                                                                                                                                              │
    │     at java.base/sun.security.provider.ConfigFile$Spi.init(Unknown Source)                                                                                                                                                                                                                                                                                                              │
    │     ... 18 more                                                                                                                                                                                                                                                                                                                                                                         │
    │ .
  • s

    Surya Narayana

    04/25/2025, 7:43 PM
    Am trying to sync a SQL server table data to iceberg table (filesysystem) on an Ubuntu machine using Flink JDBC connector. However when I try to create the table using with properties, am able to execute the code without any exceptions but then there is no data flowing to iceberg table. Below is the code, can some help to check what am I doing wrong? from pyflink.table import EnvironmentSettings, TableEnvironment # Set batch mode env_settings = EnvironmentSettings.in_batch_mode() table_env = TableEnvironment.create(env_settings) table_env.execute_sql(""" CREATE CATALOG horizon_user_catalog WITH ( 'type' = 'iceberg', 'catalog-type' = 'hadoop', 'warehouse' = 'file:///data/icebergdwh/hrzn/', 'property-version' = '1' ) """) table_env.execute_sql("USE CATALOG horizon_user_catalog") table_env.execute_sql("CREATE DATABASE demo2") table_env.execute_sql("USE demo2") table_env.execute_sql(""" CREATE TABLE test_jdbc_read2 ( id BIGINT, uname STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:sqlserver://abc:1234;databaseName=iceberg', 'table-name' = 'dbo.usertable', 'username' = 'dwh', 'password' = 'xyzabc123789', 'driver' = 'com.microsoft.sqlserver.jdbc.SQLServerDriver' ) """) table_env.execute_sql("SHOW TABLES").print() table_env.execute_sql("SELECT * FROM horizon_user_catalog.demo2.test_jdbc_read2 limit 10").print() table_env.execute_sql("SELECT count(*) FROM test_jdbc_read2").print()
  • m

    maja

    04/25/2025, 9:23 PM
    curious if anyone is using flink in batch mode in a k8s environment? trying to better understand failure recovery if there is no notion of checkpointing in batch mode
    • 1
    • 1
  • a

    Antti Kaikkonen

    04/28/2025, 7:20 AM
    Hi, I'm trying to implement a data source using the new Source API. The goal is to have each
    SourceReader
    output records with strictly increasing timestamps, but I can't figure out how to implement the
    addSplitsBack
    method in the
    SplitEnumerator
    when all source readers have already received splits with greater timestamps. The only way I can think of is to simply throw an error in the
    addSplitsBack
    to trigger a checkpoint recovery, but is this a valid way to do it?
  • a

    Arif Khan

    04/28/2025, 6:12 PM
    Hello All, I am using flink kubernetes operator
    1.11.0
    and when I am trying to setup session cluster in AWS EKS, I am getting this error:
    Copy code
    one or more objects failed to apply, reason: Internal error occurred: failed calling webhook "mutationwebhook.flink.apache.org": failed to call webhook: Post "<https://flink-operator-webhook-service.flink-cluster.svc:443/mutate?timeout=10s>": Address is not allowed
    I also tried this setting at session cluster level, but no luck
    kubernetes.hostnetwork.enabled: "true"
    I see a lot of folks have a session cluster setup in AWS EKS and if you ran into this issue, please spare a minute to help me with this, appreciate your help.
  • n

    Nihar Rao

    04/28/2025, 8:33 PM
    Hi everyone! We are running into a weird issue with apache flink kubernetes operator 1.10.0 and apache flink 1.19.1. We are running jobs using native kubernetes mode and FlinkDeployment CRD. We are running a job with 24 taskmanagers and 1 Jobmanager replica with HA enabled Below is the chronological summary of events: 1. Job was initially started with 24 task managers. 2. JM pod was OOMkilled and it is confirmed by our KSM metrics and
    kubectl describe pod <JM pod>
    shows the pod restarted due to OOM as well. 3. After JM was OOMkilled, JM was restarted and 24 new taskmanagers pods were started and is confirmed on flink UI on available task slots section. 4. There was no impact on job (it restarted successfully) but there are 48 taskmanagers running out of which 24 are standby. The expected behaviour after a JM OOM with HA enabled is no starting of new task managers. 5. I simulated the OOM on JM pod for one of our other job but no similar behaviour was observed. I have added the flink UI showing 24 extra TMs (48 task slots) screenshot and kubectl output in the thread. Can you please help us on how to debug this as kubernetes operator don't show any relevant information on why this happened. Thanks!
    • 1
    • 2
  • p

    Pedro Mázala

    04/29/2025, 10:38 AM
    Hello there 👋 Is someone here using a Redis sink with Flink? I was checking the connector and the Redis one seems to be attached to Apache Bahir. I'm wondering if there is another possible way without much lift.
  • r

    Raj Kunwar Singh

    04/29/2025, 3:04 PM
    Hi team, I am facing a weird issue while using
    Flink v1.19.2
    . I am trying to read an s3 path using the
    FileSystem connector
    . If I try it the
    Table API Connector
    way, the read happens the first time but subsequently if new "partitions" are getting added, Flink is not aware of them even when I provide
    'source.monitor-interval'
    to make it unbounded.
    Copy code
    String createTableSql = """
        CREATE TABLE tbl (
            col1 STRING,
            part_col STRING
        ) PARTITIONED BY (part_col)
        WITH (
            'connector' = 'filesystem',
            'path' = 's3a:/bucket_name/path',
            'format' = 'parquet',
            'source.monitor-interval' = '10s'
        )
    """;
    However if I use the
    Datastream
    Connector, which is low level and not something that I want to use, it works if new partition folders are added. I don't see any error logs even when I enabled DEBUG logs. Can someone help with this? I am suspecting that the partition discovery isn't working.
    p
    • 2
    • 6
1...9495969798Latest