https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • m

    Mohit Aggarwal

    01/03/2023, 9:57 AM
    Hi, I wanted to try out autoscaler mentioned here. https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/ It mentions using Flink 1.17 snapshot images but the link seems to be broken. Where can I get these snapshot images ?
    g
    g
    g
    • 4
    • 50
  • l

    Liubov

    01/03/2023, 11:09 AM
    Howdy all. Have a question about kafka sinks and sources in Flink(we use version 1.14.6). Recently we noticed that if kafka nodes fail, flink job may restart with kafka write timeout. So if flink sink was writing while some nodes were unresponsive, it will throw a TimeoutException and this sink exception will cause job restart. All the same with sources. The question is - is there a way to handle exceptions without restarting the job? Yes I can overwrite KafkaSink and KafkaWriter, but this doesn't look like a good solution.
    s
    m
    • 3
    • 9
  • j

    Jean-René Robin

    01/03/2023, 1:03 PM
    Hello everyone. I’m trying to setup an HA architecture using Zookeeper, with three nodes. Can I only setup master nodes ? Or do I have to setup 1 master and 2 slaves ?
  • g

    Gaurav Miglani

    01/03/2023, 1:04 PM
    I'm using flink on kubernetes with queryable client, has enabled queryable client using configs mentioned here https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/queryable_state/, but operator is only exposing 8081 service port and not 9069(rpc), shouldn't it create two services ?
  • a

    Alex Cramer

    01/03/2023, 4:52 PM
    Quick question about the OpenAPI spec for the REST API: I’m seeing different schemas between the embedded HTML and the OpenAPI spec for some of the response types in the docs, e.g.
    GET /jobs/{jobid}
    endpoint (specifically the nested
    JobDetailsVertexInfo
    ). The HTML version matches the JSON response I see when I hit the endpoint so I think that’s the right one. Can someone who knows more quickly sanity check me? Its an experimental feature, but I wasn’t sure if other people were using it w/o issues and whether I’m just mis-interpreting something.
    • 1
    • 2
  • r

    René

    01/03/2023, 7:44 PM
    Hi all, does anyone know if two compacted Kafka topics can be joined using an event-time temporal join? Both sides of the join would in this case be tables of the connector type 'upsert-kafka' (none of them is append only). Thanks
  • j

    Jason Politis

    01/03/2023, 10:11 PM
    Hey everyone. How would you solve this problem? Let's say you are using FlinkSQL pulling from kafka and you have many tables. But one of them is used by many different insert queries (let's call him table X) . The problem is, if you want to use group-offsets for scan.startup.mode, the first one that selects from that table will set the offset and non of the other queries that select from that table will receive the data that the first guy got. My first inclination is to create a new source table X for every single query that pulls from him (that means if 1 query left joins to table X twice, that's still 1 new source table for each one). That's because i'm thinking we really need to treat each table sourcing from kafka (with group-offsets) as a single consumer. Meaning, each time X is referenced in a query, that reference should be treated as a different consumer than the other references. Thoughts?
  • c

    Chen-Che Huang

    01/04/2023, 1:54 AM
    Hi all, We have a streaming application based on Flink 1.16.0 and deployed on our Kubernetes clusters via flink-kubernetes-operator 1.3.0. Several days ago, our Flink application suddenly ran out of service because it could not find some file of a savepoint. Our Flink app is configured to use incremental checkpoint and expected to run based on checkpoints instead of savepoint. We have used Flink for several years. This is probably the first time that the application failed owing to some missing savepoint file. Does anyone encounter the same issue? Any comment is appreciated. Our configurations and the exception log are put in the thread.
    g
    y
    j
    • 4
    • 20
  • s

    Suparn Lele

    01/04/2023, 5:32 AM
    Hi, What happens when our operator parallelism is more than number of task slots? Say I have in total 8 slots but parallelism is 16 then how would flink manage this?
  • s

    Soumya Ghosh

    01/04/2023, 5:53 AM
    Hi all, we have few pipelines in Flink 1.16.0 which we look like below
    Copy code
    Kafka Source --> 1 min tumble window --> Elasticsearch table connector sink
    We are observing some intermittent exceptions (posted in below thread), the flink job restarts and then works fine until it is encountered again after few hours. There are 2 type of exceptions logged, one or the other, not both at the same time. The root exception is -
    Caused by: <http://org.apache.flink.elasticsearch7.shaded.org|org.apache.flink.elasticsearch7.shaded.org>.apache.http.ConnectionClosedException: Connection is closed
    Any thoughts on this?
    m
    • 2
    • 4
  • r

    Rohan Kumar

    01/04/2023, 6:41 AM
    Hi all, In the table API how can I call an udf which takes the whole row and does some aggregation/mapping. I can pass individual columns name but I want to pass the whole row. It seems possible in the python API but I am not able find any solution in Java.
    m
    • 2
    • 1
  • d

    Doğaç Eldenk

    01/04/2023, 7:47 AM
    Hey everyone, one of my sink functions require constant re-starts (E.g. everytime a checkpoint has completed). Is there a way to make sure my sinks are closed & re-opened after a period of time?
  • s

    Slackbot

    01/04/2023, 8:01 AM
    This message was deleted.
    u
    • 2
    • 3
  • a

    Amenreet Singh Sodhi

    01/04/2023, 12:40 PM
    Hi team, is there any good source available where i can find the flink using side car containers for logging using fluentd. Thanks
    g
    • 2
    • 6
  • a

    Abdelhakim Bendjabeur

    01/04/2023, 1:04 PM
    Hello, I am experimenting on the FlinkSQL Client CLI and there is this weird behaviour:
    Copy code
    Flink SQL> SELECT  *, ROW_NUMBER() OVER (PARTITION BY id, accountId ORDER BY `timestamp` DESC) rn FROM ticket;
    [ERROR] Could not execute SQL statement. Reason:
    org.apache.flink.table.api.TableException: The window can only be ordered in ASCENDING mode.
    ...
    Flink SQL> SELECT  *, ROW_NUMBER() OVER (PARTITION BY id, accountId ORDER BY `timestamp` ASC) rn FROM ticket;
    [ERROR] Could not execute SQL statement. Reason:
    org.apache.flink.table.api.TableException: OVER windows' ordering in stream mode must be defined on a time attribute.
    The field is a timestamp though! But using a CTE OR sub-query solved it.
    Copy code
    SELECT  *
      FROM (
        SELECT  *, ROW_NUMBER() OVER (PARTITION BY id, accountId ORDER BY `timestamp` DESC) rn FROM ticket
      ) s
      WHERE rn = 1);
    ---------------------------------------------------------------------------------------------------------
    WITH ticket_dedup AS (
      SELECT  *
      FROM (
        SELECT  *,
          ROW_NUMBER() OVER (PARTITION BY id, accountId ORDER BY `timestamp` DESC) rn
        FROM ticket
      ) s
      WHERE rn = 1)
     SELECT * FROM ticket_dedup;
    Is this a bug somehow or is there a reason behind it?
    w
    • 2
    • 1
  • b

    bharat chaudhury

    01/04/2023, 2:07 PM
    I am new to Flink . I installed pyflink but it always points to python 2.7 even though i have 3.6.8 . Caused by: java.io.IOException: Failed to execute the command: python -c import pyflink;import os;print(os.path.join(os.path.abspath(os.path.dirname(pyflink.file)), 'bin')) output: Traceback (most recent call last): File "<string>", line 1, in <module> ImportError: No module named pyflink
    t
    d
    • 3
    • 2
  • a

    Adriana Beltran

    01/04/2023, 3:41 PM
    Hi team, I noticed there is only documentation on adding stateful function as a dependency on the Java SDK using Maven, does anyone have any idea how to add the stateful functions dependency for Gradle?
  • s

    Sami Badawi

    01/04/2023, 6:18 PM
    Problem using JDBC:
    Copy code
    obj_path = "public.model_counts"
        catalog_path = ObjectPath("postgres", obj_path)
        print(catalog_path)
        table_exists = catalog.table_exists(catalog_path)
        print(table_exists)
        table: CatalogBaseTable = catalog.get_table(catalog_path)
        print("table:", table)
        table_schema: Schema = table.get_unresolved_schema()
        print("table_schema:", table_schema)
    I can get the database table but as a CatalogBaseTable and I am not sure how to a query the data in the table. Is there a simple JDBC example show how to run the query?
    d
    • 2
    • 2
  • j

    Joris Basiglio

    01/04/2023, 6:26 PM
    Hey, does Flink support restoring state from an operator but dropping a single descriptor from within?
    s
    • 2
    • 5
  • s

    Sami Badawi

    01/04/2023, 8:00 PM
    Problem with Postgres jsonb datatype: When I run SQL query against Postgres table event_files that has a column of type jsonb:
    Copy code
    sql = "SELECT entity_id FROM event_files"
        table2 = t_env.sql_query(sql)
        table2.execute().print()
    I get this error despite not using any of the column that has jsonb type:
    Copy code
    File "/Users/sami/miniconda3/envs/pystream39/lib/python3.9/site-packages/py4j/protocol.py", line 326, in get_return_value
        raise Py4JJavaError(
    py4j.protocol.Py4JJavaError: An error occurred while calling o8.sqlQuery.
    : org.apache.flink.table.api.ValidationException: SQL validation failed. Failed getting table postgres.event_files
    	at <http://org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org|org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org>$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:186)
    	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113)
    	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261)
    	at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
    	at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:703)
    
    
    org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
    	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
    	at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
    	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952)
    	at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
    	at <http://org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org|org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org>$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:182)
    	... 15 more
    Caused by: java.lang.UnsupportedOperationException: Doesn't support Postgres type 'jsonb' yet
    	at org.apache.flink.connector.jdbc.dialect.psql.PostgresTypeMapper.mapping(PostgresTypeMapper.java:173)
    	at org.apache.flink.connector.jdbc.catalog.PostgresCatalog.fromJDBCType(PostgresCatalog.java:147)
    	at org.apache.flink.connector.jdbc.catalog.AbstractJdbcCatalog.getTable(AbstractJdbcCatalog.java:271)
    	... 40 more
    s
    • 2
    • 13
  • e

    Emmanuel Leroy

    01/04/2023, 10:20 PM
    When using a Kafka source, should I expect that parallelism = number of consumers in consumer group = nb partitions? I’m reading from 3 partitions, but I’m not able to take advantage of the full throughput of the kafka queue for some reason (i.e. pending messages are increasing = read not keeping up with ingest, even though bandwidth for read is 2x the bandwidth for writes), and not sure how to troubleshoot this.
  • s

    sap1ens

    01/04/2023, 11:27 PM
    Hey everyone, I use Flink 1.15.2 and Flink Kubernetes Operator ~1.2.0. Today I was redeploying a bunch of jobs (with
    upgradeMode: last-state
    ) and some of them have failed with:
    Copy code
    java.lang.IllegalStateException: There is no operator for the state c0bfb6158e09ccdb03a634ce9b2009b0
    	at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness(StateAssignmentOperation.java:733)
    	at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:98)
    	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1670)
    	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1598)
    	...
    However, I haven’t changed anything Flink-related (was updating some K8s configs), it was using exactly the same Docker image, JAR, etc. Is this a known issue (maybe addressed in later versions)?
    s
    g
    • 3
    • 5
  • d

    Deryl Rodrigues

    01/05/2023, 5:48 AM
    Hi Guys following the example listed here https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/hive/hive_read_write/#temporal-join-the-latest-partition , I am trying to implement temporal join on processing time between table A(source kafka) and table B(defined in hive catalog).
    Copy code
    SELECT
                a.column1,
                a.column2,
                b.column1
              FROM tableA AS a
              JOIN hive_catalog.tableB /*+ OPTIONS('streaming-source.enable'='true') */ FOR SYSTEM_TIME AS OF a.proctime AS b  
              ON a.column1 = b.column1
    I encounter the following error
    Copy code
    Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Temporal Table Join requires primary key in versioned table, but no primary key can be found. The physical plan is:
    m
    • 2
    • 1
  • s

    Sami Badawi

    01/05/2023, 8:33 AM
    I am trying to used CDC for Postgres from PyFlink, but it fails. Has anybody gotten this to work?
    Copy code
    catalog: JdbcCatalog = JdbcCatalog(
            name, default_database, username, password, base_url)
        t_env.register_catalog("my_catalog", catalog)
        t_env.use_catalog("my_catalog")
    
        ddl = """
        CREATE TABLE shipments_cdc (
      shipment_id INT,
      order_id INT,
      origin STRING,
      destination STRING,
      is_arrived BOOLEAN
    ) WITH (
      'connector' = 'postgres-cdc',
      'hostname' = 'localhost',
      'port' = '5432',
      'username' = 'postgres',
      'password' = 'postgres',
      'database-name' = 'postgres',
      'schema-name' = 'public',
      'table-name' = 'shipments'
    );
    """
        t_env.execute_sql(ddl)
    with stacktrace:
    Copy code
    File "/Users/sami/miniconda3/envs/pystream39/lib/python3.9/site-packages/py4j/java_gateway.py", line 1321, in __call__
        return_value = get_return_value(
      File "/Users/sami/miniconda3/envs/pystream39/lib/python3.9/site-packages/pyflink/util/exceptions.py", line 158, in deco
        raise java_exception
    pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Could not execute CreateTable in path `my_catalog`.`postgres`.`shipments_cdc`
    	at org.apache.flink.table.catalog.CatalogManager.execute(CatalogManager.java:888)
    	at org.apache.flink.table.catalog.CatalogManager.createTable(CatalogManager.java:649)
    	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:929)
    	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
    	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    	at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    	at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    	at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
    	at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    	at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
    	at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
    	at java.base/java.lang.Thread.run(Thread.java:829)
    Caused by: java.lang.UnsupportedOperationException
    	at org.apache.flink.connector.jdbc.catalog.AbstractJdbcCatalog.createTable(AbstractJdbcCatalog.java:310)
    	at org.apache.flink.table.catalog.CatalogManager.lambda$createTable$11(CatalogManager.java:660)
    	at org.apache.flink.table.catalog.CatalogManager.execute(CatalogManager.java:882)
    	... 14 more
    d
    • 2
    • 10
  • a

    Ari Huttunen

    01/05/2023, 8:35 AM
    How can I fetch an environment variable's contents in an SQL script? This would be for passwords, etc.
    m
    • 2
    • 2
  • j

    jaiprasad

    01/05/2023, 9:16 AM
    Flink upgrade from 1.15.1 to 1.16 - PrometheusReporter Error Hi , Below exception is thrown in the task manager after upgrade to 1.16 , what could be reason? This error was not in 1.15.1
    Copy code
    2023-01-05 09:07:47,566 ERROR org.apache.flink.runtime.metrics.ReporterSetup               [] - Could not instantiate metrics reporter prom. Metrics might not be exposed/reported.
    java.lang.ClassNotFoundException: org.apache.flink.metrics.prometheus.PrometheusReporter
            at jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source) ~[?:?]
            at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown Source) ~[?:?]
            at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
            at java.lang.Class.forName0(Native Method) ~[?:?]
            at java.lang.Class.forName(Unknown Source) ~[?:?]
            at org.apache.flink.runtime.metrics.ReporterSetup.loadViaReflection(ReporterSetup.java:456) ~[flink-dist-1.16.0.jar:1.16.0]
            at org.apache.flink.runtime.metrics.ReporterSetup.loadReporter(ReporterSetup.java:409) ~[flink-dist-1.16.0.jar:1.16.0]
            at org.apache.flink.runtime.metrics.ReporterSetup.setupReporters(ReporterSetup.java:328) ~[flink-dist-1.16.0.jar:1.16.0]
            at org.apache.flink.runtime.metrics.ReporterSetup.fromConfiguration(ReporterSetup.java:209) ~[flink-dist-1.16.0.jar:1.16.0]
            at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.startTaskManagerRunnerServices(TaskManagerRunner.java:223) ~[flink-dist-1.16.0.jar:1.16.0]
            at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.start(TaskManagerRunner.java:288) ~[flink-dist-1.16.0.jar:1.16.0]
            at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:481) ~[flink-dist-1.16.0.jar:1.16.0]
            at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.lambda$runTaskManagerProcessSecurely$5(TaskManagerRunner.java:525) ~[flink-dist-1.16.0.jar:1.16.0]
            at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) [flink-dist-1.16.0.jar:1.16.0]
            at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerProcessSecurely(TaskManagerRunner.java:525) [flink-dist-1.16.0.jar:1.16.0]
            at org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner.runTaskManagerSecurely(KubernetesTaskExecutorRunner.java:66) [flink-dist-1.16.0.jar:1.16.0]
    m
    • 2
    • 2
  • a

    Ari Huttunen

    01/05/2023, 9:42 AM
    I still have some problems with the
    sql-client.sh
    . I have an init script that creates a table that reads streaming data from Kafka. If I try to select some rows like
    select * from the_table limit 10;
    I get nothing, just an empty result. The same SQL statement embedded in Python provides data just fine. In pyflink I have a
    .wait()
    . Does the sql-client support streaming from kafka, and how should I fix this?
    m
    • 2
    • 9
  • a

    Arun Lakra

    01/05/2023, 10:27 AM
    Hi there, I have a question about the recommended deployment mode for Stateful Functions applications. As per the latest release the recommended mode is to build a Docker image (https://nightlies.apache.org/flink/flink-statefun-docs-release-3.2/docs/deployment/overview/). What if I package my job as a Flink Jar and submit it to a Flink cluster? Flink Jar as an approach is not mentioned in the latest release, but it was an option in v2.0 (https://nightlies.apache.org/flink/flink-statefun-docs-release-2.2/deployment-and-operations/packaging.html#flink-jar). I am curious about the implications of Flink Jar deployment mode as an approach, and why isn't it recommended in the latest release.
    c
    • 2
    • 5
  • e

    Emmanuel Leroy

    01/05/2023, 9:08 PM
    Hi, I’m streaming data from a Kafka source and dumping it to Object Storage with FileSink. Flink doesn’t seem to be able to keep up with the kafka flow rate, even when I increase parallelism. It’s not clear to me if it’s the source reading that is limited or the sink. I don’t see back pressure on the sink, but I already have 1 slot per kafka partition so increasing that won’t help. What are strategies to look into to improve performance here?
    s
    m
    • 3
    • 9
  • e

    Emmanuel Leroy

    01/06/2023, 12:34 AM
    following up on debugging kafka throughput: I am reading from 4 topics with 3 partitions each, with a 2MB/s read limit per partition. I have 3 consumers (parallelism 3) as I have 3 partitions in each topic. On my local machine, I’m able to read @ ~17MB/s of raw data, although my kafka logs says I’m reading at the max 6MB/s per topic (which should be 24MB/s) However, with the same code, running on a Flink Session cluster on k8s, I only get 4MB/sec of raw data total (i.e. 4x less) Not sure what could be the problem here. Note that if i use parallelism =1 I get the same throughput as with parallelism = 3.
    c
    m
    • 3
    • 10
1...444546...98Latest