https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • m

    milk

    05/22/2023, 8:24 AM
    Hi All, Is there a way to add a fixed window end time for the session window? For example, after the session window lasts 48 hours, force close it.
    d
    • 2
    • 3
  • s

    Sumit Singh

    05/22/2023, 8:57 AM
    Hello , anyone using Flink on EC2 clusters ? I am facing issues with flink-conf.yaml , changed the job manager rpc to master host IP
    Copy code
    2023-05-22 06:32:10,019 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could not resolve ResourceManager address <akka.tcp://flink@private_ip:6123/user/rpc/resourcemanager_*>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@private_ip:6123/user/rpc/resourcemanager_*>.
  • s

    sagar shinde

    05/22/2023, 10:23 AM
    Hello Team, I have setup flink cluster in kubernets cluster now i want test flink application in flink cluster. is it possible to do CICD using argocd to deploy the app in flink cluster
  • s

    sagar shinde

    05/22/2023, 11:24 AM
    Hello Team, Any one know how to configure Flink on kubernet cluster using minio as storage directory if have any document please shared
  • p

    Palani

    05/22/2023, 2:31 PM
    Hello Team.. Can someone help on this please. We are using AWS KDA and trying to set few Kafka related properties using properties..* But in KDA log we are seeing the warning as attached image. How to set those kafka related properties?
    j
    • 2
    • 14
  • j

    Jirawech Siwawut

    05/22/2023, 4:15 PM
    Can someone help review pr? https://github.com/apache/flink/pull/22400
  • a

    Amir Hossein Sharifzadeh

    05/22/2023, 4:34 PM
    Hi everybody. I got stuck in figuring out
    ValueState
    in my application. Does anybody have an experience working with
    ValueState
    ,
    ListState
    and
    EmbeddedRocksDBStateBackend
    ?
  • d

    Dylan Meissner

    05/22/2023, 11:42 PM
    With Flink operator plugin I want a custom resource validator to make a decision based on namespace name or some data that requires me to look outside the FlinkDeployment spec. For example:
    Copy code
    public class CustomValidator implements FlinkResourceValidator {
        @Override
        public Optional<String> validateDeployment(FlinkDeployment deployment) {
            var namespace = deployment.getMetadata().getNamespace();
            if (namespace == "foo") {
              //...
            }
            return Optional.empty();
        }
    }
    • 1
    • 1
  • b

    Bharathkrishna G M

    05/23/2023, 12:28 AM
    I'm using Flink 1.16.1 . Seeing an error:
    Copy code
    2023-05-23 00:26:48,863 WARN  org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - Configuring the job submission via query parameters is deprecated. Please migrate to submitting a JSON request instead.
    2023-05-23 00:26:48,931 INFO  org.apache.flink.client.ClientUtils                          [] - Starting program (detached: true)
    2023-05-23 00:26:49,101 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - Exception occurred in REST handler: Could not execute application.
    I don't see any stack trace in the logs or in the UI logs . This error is seen in jobmanager log. Any idea how to find the root cause for this ?
    • 1
    • 3
  • s

    Samrat Deb

    05/23/2023, 3:04 AM
    Is there any way to add custom tags on the jobmanager/taskmanager pods while deploying flink in kubernetes?
  • s

    Sonika Singla

    05/23/2023, 5:38 AM
    Hi Team , I am attempting to scale my Flink application by utilizing Horizontal Pod Autoscaling (HPA). When the usage surpasses a predefined threshold, the task manager undergoes a restart. My job involves consuming records from Hudi, performing processing operations on them, and producing the results to a Kafka topic. However, when the job restarts, it inadvertently generates duplicate records in the sink. My question is as follows: If the Flink job restarts between two checkpoints, will it reprocess the records that were already processed after the last checkpoint? Furthermore, if a savepoint is utilized, which also includes a time interval, does that imply that the records will be reprocessed in the event of a savepoint?
    d
    m
    • 3
    • 4
  • s

    Slackbot

    05/23/2023, 5:44 AM
    This message was deleted.
    s
    • 2
    • 1
  • b

    behrooz razzaghi

    05/23/2023, 5:46 AM
    Hello, i deploy Flink on k8s and run basic-example it is working good, i want to know how does basic example connected to HDFS file system ?
  • m

    Mohan M

    05/23/2023, 5:55 AM
    Hi, i am new to flink i am using pyflink 1.17.0, Working on use case, where i need to read data from kafka and upsert to postgresql database but i am getting error below is my code and error CODE: from pyflink.common import Encoder from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.connectors.file_system import FileSink from pyflink.table import StreamTableEnvironment, TableDescriptor, Schema, DataTypes from pyflink.datastream.connectors.kafka import KafkaSource from pyflink.common.serialization import SimpleStringSchema from pyflink.datastream.formats.json import JsonRowSerializationSchema, JsonRowDeserializationSchema from pyflink.common import Types from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, FlinkKafkaConsumer from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import EnvironmentSettings, StreamTableEnvironment from pyflink.table.catalog import JdbcCatalog from pyflink.table import EnvironmentSettings, TableEnvironment env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env) deserialization_schema = JsonRowDeserializationSchema.Builder() \ .type_info(Types.ROW([Types.INT(), Types.STRING(),Types.INT()])) \ .build() kafka_consumer = FlinkKafkaConsumer( topics='flinkevents', deserialization_schema=deserialization_schema, properties={'bootstrap.servers': 'localhost:9092'} ) kafka_consumer.set_start_from_earliest() name = "my_table" default_database = "postgres" # Replace with your PostgreSQL database name username = "postgres" password = "S@iling0844" base_url = "jdbc:postgresql://localhost:5432" # Replace with your PostgreSQL host and port catalog = JdbcCatalog(name, default_database, username, password, base_url) t_env.register_catalog("my_table", catalog) t_env.use_catalog("my_table") env.add_source(kafka_consumer).print() #t_env.create_temporary_table('kafka_source', kafka_consumer) t_env.execute_sql(''' CREATE TABLE sink_table ( id INT, name STRING, age INT ) WITH ( 'connector' = 'jdbc', 'table-name' = 'postgres_sink', 'sink.buffer-flush.max-rows' = '1' ) ''') t_env.execute_sql(''' INSERT INTO jdbc_sink SELECT id, name, age FROM kafka_source ''') env.execute('Kafka to POSTGRESQL') ERROR: File "/Users/Documents/Flink app/discriptors.py", line 44, in <module> t_env.execute_sql(''' File "/Users/Documents/Flink app/env/lib/python3.9/site-packages/pyflink/table/table_environment.py", line 837, in execute_sql return TableResult(self._j_tenv.executeSql(stmt)) File "/Users/Documents/Flink app/env/lib/python3.9/site-packages/py4j/java_gateway.py", line 1322, in call return_value = get_return_value( File "/Users/Documents/Flink app/env/lib/python3.9/site-packages/pyflink/util/exceptions.py", line 158, in deco raise java_exception pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Could not execute CreateTable in path
    my_table
    .
    postgres
    .
    sink_table
    at org.apache.flink.table.catalog.CatalogManager.execute(CatalogManager.java:915) at org.apache.flink.table.catalog.CatalogManager.createTable(CatalogManager.java:652) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1000) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:765) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.UnsupportedOperationException at org.apache.flink.connector.jdbc.catalog.AbstractJdbcCatalog.createTable(AbstractJdbcCatalog.java:208) at org.apache.flink.table.catalog.CatalogManager.lambda$createTable$11(CatalogManager.java:663) at org.apache.flink.table.catalog.CatalogManager.execute(CatalogManager.java:909) ... 14 more Appreciate any help here Thanks
    m
    • 2
    • 1
  • d

    Dheeraj Panangat

    05/23/2023, 8:27 AM
    Hi Team, In Native Kubernetes Flink, is there a provision to mount secrets as volume. Our Use Case : Mount certificates as volume to the pods I understand that with Flink Kubernetes Operator we can achieve this by just adding the volumes, but does Native Flink Kubernetes provide any such option or flexibility ? Appreciate any inputs here. Thanks.
    g
    • 2
    • 7
  • o

    Oscar Perez

    05/23/2023, 11:18 AM
    Hi team, I would like to run flink in docker-compose with also hive metastore being the data backed up in s3. I have managed to configure the flink and s3 via localstack but is there any example of how to configure the docker-compose so that flink is able to connect to hive in order to create tables and catalogs? I could not find any information regarding this. Thanks!
    m
    p
    • 3
    • 2
  • s

    Shiv Desai

    05/23/2023, 2:42 PM
    Hii Team, I am working on creating a table using Table API from the data-stream of confluent-avro (from kafka topic). I am trying using the below code ( Flink version is 1.17) and it is creating table in such a way that
    resultTable
    column contains the complete json in one column of table. Can someone please help me in how to create table in such a way that each field of event is mapped to one column in the table.
    Copy code
    DataStream<GenericRecord> dataStream = ...;
    
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    Table attribution = tableEnv.fromDataStream(dataStream);
    
    tableEnv.createTemporaryView("attribution", attribution);
    Table resultTable = tableEnv.sqlQuery("SELECT f0 FROM attribution");
    tableEnv.createTemporaryView("resultTable", resultTable);
    tableEnv.sqlQuery("SELECT `field_name` from resultTable").execute().collect().forEachRemaining(System.out::println); // Throwing Exception.
  • m

    Madhu Sudhan

    05/23/2023, 3:12 PM
    apache-flink python package is not working at python3.9 version. Please let me know , how to install this package without downgrading the python version
    m
    • 2
    • 3
  • j

    Jirawech Siwawut

    05/23/2023, 3:30 PM
    Can anyone check? https://apache-flink.slack.com/archives/C03G7LJTS2G/p1684772119467239
  • k

    Kyle Godfrey

    05/23/2023, 5:09 PM
    Hi, I’m looking to find a way to push an event that causes a custom trigger to return
    FIRE_AND_PURGE
    to the next global window instead of the event being the last event in the global window that it’s closing. It looks like an evictor might be able to do this but the stipulation that an evictor requires all events to stay in state until the window is closed and the evictor run isn’t feasible (we’re aggregating data for thousands of assets, each with thousands of events for each asset so it’s just too much state to keep). Is there another way of pushing this event off to the next window?
  • s

    Shiv Desai

    05/23/2023, 8:17 PM
    Can someone please help me here?. I am facing issue in modelling table as the input event is highly nested and i am not able to access the nested element from the code snippet. https://apache-flink.slack.com/archives/C03G7LJTS2G/p1684852955178819
    m
    • 2
    • 7
  • a

    Alex Brekken

    05/24/2023, 2:22 AM
    Hi everyone, I’ve running the K8s operator and have a job deployed to my cluster (session mode) successfully. I’ve got prometheus metrics being scraped and am building a dashboard. Everything is working great, save for 1 issue: I can’t find a way to add labels to the FlinkSessionJob CRD so that I can distinguish the jobs apart from each other in prometheus. I’ve tried adding a label under
    metadata
    in the CRD, but it doesn’t seem to get discovered by prometheus. Example:
    Copy code
    apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
    kind: FlinkSessionJob
    metadata:
      namespace: flink
      name: my-flink-job
      labels:
        customLabel: my-flink-application
    But “customLabel” does not show up in Prometheus. Maybe I’m going about this the wrong way? Basically I just need a way to distinguish between multiple jobs running on the same
    FlinkDeployment
    so I can toggle between them in a Grafana dashboard. Thanks for any help!
    g
    • 2
    • 3
  • o

    Or Keren

    05/24/2023, 10:13 AM
    Hey everyone! I'm trying to use the Hybrid Source in order to "warm up the state" with pre saved data. The issue is that I need the streaming source at the end of it to be multiple stream sources united together. Did anyone figure out a way to do it?
  • t

    Tilman Krokotsch

    05/24/2023, 11:22 AM
    Hey channel, I ran into a problem with converting an Iceberg table source to a datastream. I get an error message that looks like a bug in PyFlink, but I want to make sure that I did everything correct before I open a ticket. Reading the table with the Table API works without issue. Environment: • Flink 1.17 • Python 3.9 • Java 11 • macOS ARM64 Minimal example to reproduce:
    Copy code
    import os.path
    
    from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
    from pyflink.table import StreamTableEnvironment
    
    JAR_PATH = "<Path to Fat Jar>"
    
    
    def example():
        env = StreamExecutionEnvironment.get_execution_environment()
        env.set_runtime_mode(RuntimeExecutionMode.BATCH)
        env.add_jars(JAR_PATH)
        t_env = StreamTableEnvironment.create(env)
    
        t_env.execute_sql(
            """
                CREATE CATALOG iceberg_catalog WITH (
                    'type'='iceberg',
                    'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',
                    'warehouse'='s3://<My Bucket>',
                    'io-impl'='org.apache.iceberg.aws.s3.S3FileIO',
                    'client.factory'='org.apache.iceberg.aws.AssumeRoleAwsClientFactory',
                    'client.assume-role.region'='<My Region>',
                    'client.assume-role.arn'='<My Role>'
                )
            """
        )
        source_table = t_env.from_path(
            "iceberg_catalog.<My Database>.<My Table>"
        )
        t_env.to_data_stream(source_table).print()
        env.execute()
    
    
    if __name__ == "__main__":
        example()
    This is the error message:
    Copy code
    File "<My venv>/lib/python3.9/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py", line 58, in decode_from_stream
        return self._value_coder.decode_from_stream(data_input_stream)
    TypeError: Argument 'input_stream' has incorrect type (expected pyflink.fn_execution.stream_fast.LengthPrefixInputStream, got BeamInputStream)
    Did anyone encounter something similar before? Thanks in advance.
    d
    • 2
    • 2
  • o

    Oscar Perez

    05/24/2023, 12:47 PM
    hei team! We are trying to setup a local environment using flink+hive and we have been following this repository as reference: https://github.com/fhueske/flink-sql-demo/blob/master/docker-compose.yml We have had to change it a bit since we wanted to use Hive 3.1.3 and Hadoop 3.3.3. We are now facing an issue when submitting a job via flink UI. This is the stack trace: https://gist.github.com/mostrovoi/fcaeaf34c0cccb02ca2d17ce43ed5397 As you can see there seems to be a mismatch of versions and we suspect of the library. flink-shaded-hadoop2-uber . How can we know what version of shaded hadoop works with hive 3.1.0 ? We have tried to google for information but we could not find much about the purpose of this library or the aforementioned error. Thanks in advance!
    m
    • 2
    • 2
  • o

    Oscar Perez

    05/24/2023, 1:58 PM
    hei, it is me again. We are trying to configure flink with hadoop as instructed here (https://nightlies.apache.org/flink/flink-docs-release-1.10/ops/deployment/hadoop.html) but we would like to use hadoop v3 but we do not see any shaded version for hadoop 3.x release. we want to use Hive 3.1.3 and Hadoop 3.3.3 and would like to use the shaded lib approach if possible. How can we do it for hadoop v3? Thanks!
    m
    • 2
    • 4
  • m

    Matthew Kerian

    05/24/2023, 6:47 PM
    Hi team qq. We've been having checkpoints fail intermittently and I'm trying to debug it. We had a checkpoint fail ~30 min. ago and the UI shows us
    Failed: 1
    . The checkpoint monitoring doc page says there should be a way to view more information about this but I can't find that option. Would appreciate a pointer, thanks
    m
    • 2
    • 3
  • z

    Zhong Chen

    05/24/2023, 10:56 PM
    I am facing the same issue as described in this stackflow post. Anyone knows what might cause this? Flink 1.17, Java 11, Scala 2.12.17 https://stackoverflow.com/questions/75940903/flink-kafkasink-takes-long-time-to-start
    m
    t
    • 3
    • 10
  • h

    HJK nomad

    05/25/2023, 1:18 AM
    somebody help 😭😭😭?? https://apache-flink.slack.com/archives/C03G7LJTS2G/p1684286487103919?thread_ts=1684217300.849039&amp;cid=C03G7LJTS2G
  • s

    Sumit Nekar

    05/25/2023, 7:03 AM
    Hello, We have deployed a flink job in application mode using flink operator. Sometimes we are seeing TM pods get terminated (error state) after multiple job restarts. I suspect TM pods may get terminated once the job fails. Can anyone help me to understand what action flink operator takes when the job is failed or restart strategy threshold is met.
    g
    • 2
    • 2
1...828384...98Latest