https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • s

    Slackbot

    11/04/2022, 9:34 AM
    This message was deleted.
    s
    a
    m
    • 4
    • 4
  • v

    Vincent canuel

    11/04/2022, 10:09 AM
    Hi everyone. We want to switch to statefun. Could we use java 17 in statefun project (I know flink only support java 11) ? Locally everything seems to work fine…
    m
    d
    • 3
    • 9
  • e

    Emmanuel Leroy

    11/04/2022, 4:50 PM
    hi, trying to run a job with flink operator after uninstalling and re-installing the operator in a different namespace, and the jobmanager does not get started. any idea what could be wrong and how to fix this?
    m
    j
    • 3
    • 17
  • p

    Prasaanth Neelakandan

    11/04/2022, 9:07 PM
    hi folks, is it possible to architect flink jobs in a way that they share a JM i.e listed as multiple jobs in the same flink UI but their TMs are isolated (they can reside in the same K8s cluster but have separate pods and K8s service) this is kind of a hybrid mode compared to Session mode and Application Mode. https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/overview/#deployment-modes
    g
    c
    • 3
    • 5
  • e

    Emmanuel Leroy

    11/04/2022, 9:08 PM
    Hello I’m running into an issue when using the Flink KafkaSource to subscribe to multiple topics. Everything works fine when using just one topic, but with more than 1, I get an error about
    Copy code
    org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support INIT_PRODUCER_ID
    and I’m told it is a feature used for transactions. On the same kafka platform, with a simple kafka consumer, I’m able to subscribe to multiple topics, so is the Flink KafkaSource making use of the transaction feature somehow?
    m
    • 2
    • 2
  • s

    Steven Zhang

    11/04/2022, 10:39 PM
    Hi, is the expected upgrade path when upgrading a FlinkDeployment running in session mode (image update for example) to first suspend all SessionJobs that are running in savepoint mode so that a savepoint file is created, and change the FlinkDeployment, and then resume all jobs? I noticed that if I just directly edit the FlinkDeployment and trigger an upgrade, the session jobs don't automatically create a savepoint before the JM/TM pods restart.
    g
    • 2
    • 11
  • r

    RM

    11/05/2022, 1:55 PM
    👋 I have a Flink job on 1.13v, configured with a min-time between checkpoints to 2 mins. After few hours of running healthy with successful checkpoints, I notice the time between checkpoints widen from 2 to 5 - 9 mins. All throughout, each checkpoint completes under 10 seconds. The JM is healthy and I don't see any errors in the logs. Appreciate any pointers to look for! Thanks
  • a

    Adrian Chang

    11/05/2022, 4:11 PM
    Hello Is it possible to use Redis Sink connector in PyFlink ? If yes, could someone point me to some documentation or examples please ?
    d
    x
    • 3
    • 4
  • d

    Dan Hill

    11/05/2022, 10:05 PM
    Hi. I'm porting some Flink DataStream code to Flink SQL. The code does deduplication but only keeps track of keys for X hours. Reading the Flink SQL docs: • Deduplication - This doesn't indicate how long the state is kept around. • Window Deduplication - I'd want something closer to Session windows. Thoughts? Should I keep this out of Flink SQL?
    m
    d
    • 3
    • 3
  • b

    Bill G

    11/06/2022, 10:34 PM
    Flink's avro deserializer can't deserialize messages with a schema including a field named
    schema
    . It will throw an exception
    Expecting type to be a PojoTypeInfo
    . The problem is that the avro schema compiler generates the getter
    getSchema$()
    because
    getSchema()
    is always generated to return the field
    SCHEMA$
    . Is it possible for Flink's deserializer to ever be generalized for this case or is this not worthy of an issue? I don't necessarily have control of the schemas. In particular, messages generated by Debezium include a source schema with the field
    schema
    included. It is a nested field so I can't drop or rename this field using the
    ReplaceField
    transform for Kafka, and I do not want to flatten the structure.
    m
    • 2
    • 2
  • a

    Arnon

    11/07/2022, 6:06 AM
    I'm trying to create some small POC based on https://nightlies.apache.org/flink/flink-docs-master/docs/try-flink/table_api/#/ Is works fine , based on Flink 1.15. But I need to upgrade to ver 1.16+ in order to use lastValue/firstValue functions But then I gets exceptions that I can't solve: Any ideas? Thanks
    Copy code
    2022-11-06T18:11:49.010167928Z Caused by: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.transactions'.
    2022-11-06T18:11:49.010170313Z 
    2022-11-06T18:11:49.010172619Z Table options are:
    2022-11-06T18:11:49.010174700Z 
    2022-11-06T18:11:49.010176956Z 'connector'='kafka'
    2022-11-06T18:11:49.010179236Z 'format'='json'
    2022-11-06T18:11:49.010181470Z 'properties.bootstrap.servers'='kafka:9092'
    2022-11-06T18:11:49.010183706Z 'scan.startup.mode'='earliest-offset'
    2022-11-06T18:11:49.010185846Z 'topic'='transactions'
    :
    :
    2022-11-06T18:11:49.010475640Z Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
    2022-11-06T18:11:49.010478276Z 
    2022-11-06T18:11:49.010480410Z Available factory identifiers are:
    2022-11-06T18:11:49.010482603Z 
    2022-11-06T18:11:49.010484875Z blackhole
    2022-11-06T18:11:49.010486997Z datagen
    2022-11-06T18:11:49.010489065Z filesystem
    2022-11-06T18:11:49.010491014Z print
    m
    j
    • 3
    • 10
  • m

    Matteo De Martino

    11/07/2022, 10:06 AM
    Hello all, Bit of a strange issue, not sure I will be able to explain it properly, but I will try. I have created a Table with SQL
    CREATE TABLE
    and I set up a
    watermark
    column called
    _ingested_timestamp
    .
    Copy code
    CREATE TABLE IF NOT EXISTS MY_TABLE
      (
          id                    INT NOT NULL,
          ...
          ...
          _ingested_timestamp   TIMESTAMP(3),
          WATERMARK FOR _ingested_timestamp AS _ingested_timestamp - INTERVAL '10' SECOND,
      PRIMARY KEY(id) NOT ENFORCED
    )
    WITH(...);
    I then have a
    SELECT
    using data from the above table, something like this:
    Copy code
    SELECT
     item.id,
     ...
     ...
     item._ingested_timestamp as event_time, 
    FROM MY_TABLE as item JOIN ....;
    However, this fails with an exception around
    Conversion to relational algebra failed to preserve datatypes
    . Specifically, there seems to be a mismatch with due to the fact that
    event_time
    is expected to be
    ROWTIME
    but it is not (or the other way around):
    Copy code
    validated type:
    RecordType(INTEGER NOT NULL id, ..., TIMESTAMP(3) *ROWTIME* event_time) NOT NULL
    converted type:
    RecordType(INTEGER NOT NULL id, ..., TIMESTAMP(3) event_time) NOT NULL
    As you can see, the only difference in the exception message seems to be about the type of
    event_time
    . Can anyone help me our here? What exactly is the issue here, and how can I use a rowtime field in a SQL without facing this issue? Thanks
    • 1
    • 1
  • k

    Kosta Sovaridis

    11/07/2022, 10:50 AM
    Hi, I am using the statefun project and I would like to use the avro confluent flink connector for ingress/egress, I am using the playground internal project as a base. Looking at the documentation I am unsure how to proceed, is there an easy way to use the flink connector, do I need to create a new ingress/egress or add custom SerDe?
  • s

    Sumit Nekar

    11/07/2022, 12:27 PM
    Hello Team, Is there any documentation available on how to configure filebeat to pull the logs of a FlinkDeployment on flink k8s operator?
    g
    • 2
    • 4
  • a

    Arnon

    11/07/2022, 12:30 PM
    How to handle json in Flink table? I have json messages like this that received from Kafka topic:
    Copy code
    {
      "order_id": 1,
      "price": 2.99,
      "product": {
        "name": "p1",
        "size": "big"
      }
    How can map them to Flink table? Since all the nested objects are key-value I can store them in Map data type, but can't find how to map it, a sample in java api will be even greater Thnaks
  • p

    Pedro Mázala

    11/07/2022, 2:23 PM
    Is
    sink
    a final step on Flink or not? I mean, can I have a stream that sinks on ES and after sinking (only when it gets stored) run an extra step? Just like a callback.
    k
    • 2
    • 2
  • a

    Abhinav sharma

    11/07/2022, 3:26 PM
    Hi, I am using a springboot application and configured an API endpoint which triggers a flink program to aggregate the data. Its a datastream API so I want a way to get the aggregated results whenever I call the API. The api should keep running in the background and flink will keep aggregating the results, whenever I hit the endpoint I must have the results. Is that possible?
  • e

    Emmanuel Leroy

    11/07/2022, 4:12 PM
    Hi, I’m trying to use the Oracle Cloud Infrastructure (OCI) HDFS connector with the Flink FileSink. OCI HDFS connector abstracts the OCI object storage for HDFS, and uses the scheme
    oci://
    , much like the s3-fs-hadoop connector abstracts S3, but as a 3rd party component. Using that, Flink Hadoop fs is able to authenticate and get setup, but then it fails on the scheme check and I get the error:
    Copy code
    Recoverable writers on Hadoop are only supported for HDFS
    I have tried the S3 connector (flink-s3-fs-hadoop) with the S3 compatibility mode, which works fine, but has some limitations on OCI, and I’d rather use the native HDFS connector. I’m trying to understand what is the RecoverableWriter functionality / limitation, to see if the OCI HDFS connector actually fits the requirements, and it’s merely the check on the scheme that is preventing me from using it (without having to fork Flink and trying to bypass the check myself) Is anyone familiar with this check and what it is really checking for by enforcing the hdfs:// scheme? Thanks
    s
    • 2
    • 2
  • j

    Jason Politis

    11/07/2022, 7:05 PM
    Good afternoon all. I have this table here that resides in a kafka topic. The kafka topic has only 2.3milliion records but the flink task says that it has sent 3.4million, and it keeps climbing. Shouldn't these numbers match? How can the task show that it's sending more records than the source topic it's pulling from? It hasn't had a chance to join or do any calculations (other than CASTing via a view)
    d
    • 2
    • 15
  • r

    RICHARD JOY

    11/07/2022, 7:55 PM
    Good day everyone! I've a question related to Flink kubernetes operator. Is it possible to add labels to Flinkdeployment CRD so that I can pass certain application specific labels in the custom resource object in the deployment? This is because my organization has validation webhooks that doesn't allow deployments without certain app specific labels in the deployment. Below is the error log from operator when application job deployment is done.
    Caused by: <http://org.apache.flink.kubernetes.shaded.io|org.apache.flink.kubernetes.shaded.io>.fabric8.kubernetes.client.KubernetesClientException: Failure executing: POST at: <https://10.19.0.1/apis/apps/v1/namespaces/ns-1/deployments>. Admission webhook "validation.gatekeeper.sh"denied the request: [deployment-must-have-costcenter] All deployments must have a 'app_id', 'app_class', 'env' label set.
    Any help is so much appreciated. Thanks!
    g
    • 2
    • 18
  • e

    Emmanuel Leroy

    11/07/2022, 10:00 PM
    Hi, Using the Flink Operator, is there a way to re-use the JobManager to run multiple jobs? In the end, I’m looking for a single UI interface to monitor all jobs, so if there is a way to do that, i’m all ears.
    s
    r
    g
    • 4
    • 8
  • k

    Krish Narukulla

    11/07/2022, 10:16 PM
    Is there a flink connector to
    scylladb
    or
    cassandra
    , let me know , we would need such capability.
    e
    • 2
    • 1
  • e

    Emmanuel Leroy

    11/07/2022, 11:39 PM
    stupid question: how do I cancel / terminate a job on the UI: I launched a job manually by uploading a jar and using Submit, but I don’t see a way to stop it! 😕
    m
    g
    • 3
    • 3
  • m

    Matt Fysh

    11/08/2022, 1:49 AM
    As a Python user with no knowledge of Java, how do I troubleshoot the following error that occurs on job submission:
    Copy code
    Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.api.operators.python.process.ExternalPythonProcessOperator
    	at java.base/java.net.URLClassLoader.findClass(Unknown Source)
    	at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
    	at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)
    j
    • 2
    • 13
  • j

    Jason Politis

    11/08/2022, 4:50 AM
    I'm using flink sql and have a table with 1.3mil records. I have many tens of tables that I have to left join to this table. Each one amplifies the volume of records in the output. What is the recommended way to flatten the results?
    s
    m
    • 3
    • 12
  • s

    Suna Yin

    11/08/2022, 9:11 AM
    Anybody meets kafka kerberos problem when using kafka in flink jar:
  • a

    Adrian Chang

    11/08/2022, 3:32 PM
    Hello, is it possible to use the DataStream Kafka connector with Confluent Avro Schema Registry in PyFlink ? I guess trying to use
    AvroRowDeserializationSchema
    will fail because of the first 5 bytes of the record containing the Schema ID. Is it a good option using the SQL connector and then convert the table into a stream ? I know this will add some latency. Is there any other option ? We use Python thanks
    👍 1
    s
    b
    • 3
    • 7
  • e

    Emmanuel Leroy

    11/08/2022, 4:59 PM
    Using operator in session mode, my job jar is not found. Using the
    local://
    scheme errors with
    Copy code
    Error:  org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'local'.
    and when using
    file://
    I get:
    Copy code
    java.io.FileNotFoundException: /opt/flink/examples/streaming/flink-demo-1.0-SNAPSHOT.jar
    when this is where I put the jar in my docker image and that works in a FlinkDeployment template. I had to use a remote URI to make it work. So where are the jars supposed to be located if i want to put them in the image, when using session mode?
  • s

    Sachin Saikrishna Manikandan

    11/08/2022, 5:32 PM
    Hello everyone. in Flink's Async IO, how many instances of the RichAsyncFunction gets initialised. When I run it in local, I can see more than one instance of that function being called. Does that mean, if I use a database client in its open method, should I expect that to be initialized multiple times? Does this all depend on the capacity parameter of the unorderedWaitWithRetry() method?
    • 1
    • 1
  • y

    Yaroslav Bezruchenko

    11/08/2022, 7:07 PM
    Hey, can you please suggest what will be the best way to migrate state from old version of flink app to new one, if I changed logic and require to redeploy both job and task managers. Is there any recommended practices to reduce damage for production in this case?
    s
    • 2
    • 2
1...303132...98Latest