https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • m

    M Harsha

    10/28/2022, 4:52 PM
    Hi all, I am unable to run the flink application jar It fails with the following exception:
    Copy code
    Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.kafka.clients.consumer.OffsetResetStrategy to field org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer.offsetResetStrategy of type org.apache.kafka.clients.consumer.OffsetResetStrategy in instance of org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer
    As per the Debugging Classloading docs, it suggests to set the param to ``classloader.resolve-order=parent-first`` I am setting this via Java code as follows:
    Copy code
    Configuration configuration = new Configuration();
    		configuration.set(CLASSLOADER_RESOLVE_ORDER,"parent-first");
    		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
    		System.out.println(env.getConfiguration().toString());
    I see the following line in the output, that prints the config
    Copy code
    classloader.resolve-order=parent-first
    But I still see the error Am I missing anything? Also I am able to run the application via IntelliJ TIA!
    m
    h
    • 3
    • 22
  • s

    Steven Zhang

    10/28/2022, 7:47 PM
    Hello, I was playing around with upgrading FlinkDeployment CRDs with running SessionJobs and was confused by the behavior I saw. This is based off of https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.2/docs/custom-resource/job-management/#stateful-and-st[…]cation-upgrades I set up my FlinkDep in standalone mode with a volume mounted (using local volume with storage on a k8s node) so it can write the Job Manager HA information and savepoints to it. In the FlinkDep flink configuration
    Copy code
    flinkConfiguration:
        taskmanager.numberOfTaskSlots: "10"
        state.savepoints.dir: "file:///flink-data/savepoints"
        state.checkpoints.dir: "file:///flink-data/checkpoints"
        high-availability: "org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory"
        high-availability.storageDir: "file:///flink-data/ha"
    I started up two SessionJobs using that deployment with
    upgradeMode: savepoint
    . I sshed into the k8s node with the volume and saw the ha directory created with job graphs in it
    Copy code
    blob  submittedJobGraph220fe25801af  submittedJobGraph477b47fdef02
    I also manually triggered a savepoint with
    savepointTriggerNonce
    and see the savepoint directory created When I went to upgrade the FlinkDep image by modifying the CRD, the jobs entered
    RECONCILING
    and never exited, and the job graphs had disappeared from the ha directory on the node (but the savepoints are still present). Am I missing something in my setup to get the jobs to continue running after the upgrade?
    • 1
    • 1
  • d

    Dan Dubois

    10/28/2022, 11:57 PM
    Hello. I'm trying to run Flink using Docker Compose. When I add
    taskmanager.memory.process.size: 32gb
    to my Task Manager
    FLIINK_PROPERTIES
    in my
    compose.yml
    file then the Task Manager crashes with the following logs:
    Copy code
    OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00000003ab000000, 18605932544, 0) failed; error='Not enough space' (errno=12)
    #
    # There is insufficient memory for the Java Runtime Environment to continue.
    # Native memory allocation (mmap) failed to map 18605932544 bytes for committing reserved memory.
    # An error report file with more information is saved as:
    # /opt/flink/hs_err_pid1.log
    The thing I don't understand is that my host machine has 64Gb of memory and the Docker containers are allocated max 64Gb so why wouldn't the JVM be able to allocate enough memory?
    m
    • 2
    • 4
  • m

    Matt Fysh

    10/29/2022, 10:38 AM
    My app uses the Table API, where I’m ingesting from a kinesis stream, create a temporary view, running multiple filters on the temporary view to create new temporary views which have s3 sinks attached I’m wondering why the UI only shows a single node in the graph, whereas in some of the articles I’ve read the screenshots contain a graph of operator nodes. Is it because multiple tables created in the Table API is compiled down into a single datastream operator? Or is there a way to change my usage to allow the DAG to render as expected in the UI, with multiple ndoes for each step in the pipeline?
    d
    • 2
    • 2
  • i

    Ikvir Singh

    10/29/2022, 5:07 PM
    Hello 👋🏽 I’m finding that my CPU usage is continuously increasing during checkpointing. I’d like some guidance on what I may be possibly doing wrong 🧵
    • 1
    • 5
  • m

    Matt Fysh

    10/30/2022, 4:27 AM
    How do I prevent parallelism on the following query? Using pyflink
    Copy code
    table_env.sql_query('SELECT * FROM my_kinesis_source')
    m
    • 2
    • 2
  • j

    Jirawech Siwawut

    10/30/2022, 4:39 AM
    Hi. I have issue using Temporal join and Window TVF aggregation. I would like to join two streams and perform global window aggregation. It turns out i will just get group aggregation instead if i use SQL like this. FYI, i prefer to have append-only datastream
    Copy code
    select category_id, window_start, window_end, count(1)
        from
        (   select *
            FROM TABLE(HOP(TABLE transacton, DESCRIPTOR(event_time), INTERVAL '1' MINUTES, INTERVAL '20' MINUTES)) AS t
        ) t
        LEFT JOIN dimention_table FOR SYSTEM_TIME AS OF t.event_time
        ON dimention_table.id = t.restaurant_id
        AND dimention_table.id IS NOT NULL
        AND t.restaurant_id IS NOT NULL
        GROUP BY category_id, window_start, window_end
    And here is the execution plan
    Copy code
    +- GroupAggregate(groupBy=[category_id, window_start, window_end], select=[category_id, window_start, window_end, COUNT(DISTINCT $f3) AS $f3, COUNT(DISTINCT $f4) AS $f4])
       +- Exchange ... 
          +- Calc ...
             +- TemporalJoin...
                :- Exchange(distribution=[hash[restaurant_id]])
                :  +- Calc ... 
                :              +- WindowTableFunction(window=[HOP(time_col=[event_time], size=[2 min], slide=[1 min])])
                :                 +- ...
                +- Exchange(distribution=[hash[id]])
                   +- Calc ...
    How do i get global window aggregation. I tried something like this but it did not work with temporal join. You may follow below sql
    Copy code
    SELECT category_id, window_start, window_end, count(1)
        FROM
        (   SELECT *
            FROM TABLE(HOP(TABLE transacton, DESCRIPTOR(event_time), INTERVAL '1' MINUTES, INTERVAL '20' MINUTES)) AS t
            LEFT JOIN dimention_table FOR SYSTEM_TIME AS OF t.event_time
            ON dimention_table.id = t.restaurant_id
            AND dimention_table.id IS NOT NULL
            AND t.restaurant_id IS NOT NULL
        ) t
        GROUP BY category_id, window_start, window_end
  • t

    Tawfik Yasser

    10/30/2022, 4:50 AM
    Hello I'm trying to execute a flink application inside the flink source code and I'm getting this error
    No ExecutorFactory found to execute the application
    . Tried different solutions as well but didn't work at all. Could anyone advice me, please? TIA
    c
    • 2
    • 1
  • h

    Haim Ari

    10/30/2022, 11:25 AM
    Hello, Is the 1.0.1 version of fink-operator still available somewhere ? I’m using Flux with shared values. I already have custom values with postRenders and a running session cluster. now I need to add an identical session cluster but can’t, as I see now that only flink-kubernetes-operator-1.1.0 or 1.2.0 are available…
    ✅ 1
    m
    • 2
    • 2
  • h

    Haim Ari

    10/30/2022, 11:30 AM
    For example my custom values with 1.1.0 or 1.2.0 are failing with:
    k-operator.yaml:82:25: executing "flink-kubernetes-operator/templates/flink-operator.yaml" at <$v.name>: can't evaluate field name in type interface {}...
  • m

    Matt Fysh

    10/31/2022, 1:21 AM
    I’m getting an OOM (out of memory) error (java heap), but I’m not even in prod yet and I’m only pushing a single event through my pipeline for testing… this seems very strange. I’m running a map operator on a datastream, and returning a complex, deeply nested value - could this be the problem? I’ll put my TypeInfo inside the thread 👇
    x
    m
    • 3
    • 15
  • k

    Krish Narukulla

    10/31/2022, 1:46 AM
    Flink sources are not building on IntelliJ, followed instructions https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/ide_setup/ . Only
    flink-table-code-spiltter
    is not building.
    m
    • 2
    • 4
  • j

    Jim

    10/31/2022, 6:57 AM
    Hello, could someone elaborate how the connection is made from Flink cluster to Kafka, is the master connecting to Kafka or is the actual connection made from worker nodes? Just asking because it looks like JobManager tries to read truststore/keystore files for some reason
    c
    • 2
    • 1
  • a

    Amenreet Singh Sodhi

    10/31/2022, 7:36 AM
    Hi all, can someone help me with this issue, when I try to mvn clean install flink-1.15.2 on my corporate network, I get the following issue while building flink-runtime-web:
    Copy code
    Failed to execute goal com.github.eirslett:frontend-maven-plugin:1.11.0:
    linstall-node-and-npm (install node and pm) on project flink-runtime-web: Could not download pm: Could not download <https://registry.npmjs.org/mpm/-/mpm-8.1.2Itgz>: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
    m
    c
    • 3
    • 2
  • h

    Haim Ari

    10/31/2022, 8:03 AM
    Hello, I’m facing this error when deploying flink-kubernetes-operator v 1.0.1 (I currently need this specific version) Can someone advise on that ? The same (clone) HelmRelease using flux is working fine on another cluster which is identical to this cluster.
    5m12s       Normal    SuccessfulCreate    replicaset/flink-kubernetes-operator-78fc4b8b86   Created pod: flink-kubernetes-operator-78fc4b8b86-dm7rd
    62s         Warning   FailedMount         pod/flink-kubernetes-operator-78fc4b8b86-dm7rd    MountVolume.SetUp failed for volume "keystore" : secret "webhook-server-cert" not found
    5m11s       Normal    Scheduled           pod/flink-kubernetes-operator-78fc4b8b86-dm7rd    Successfully assigned flink/flink-kubernetes-operator-78fc4b8b86-dm7rd to inst-pqqeh-singapore-ops-node-rtb-prd-intel
    3m9s        Warning   FailedMount         pod/flink-kubernetes-operator-78fc4b8b86-dm7rd    Unable to attach or mount volumes: unmounted volumes=[keystore], unattached volumes=[flink-operator-config-volume flink-operator-token-d7k2c keystore]: timed out waiting for the condition
    52s         Warning   FailedMount         pod/flink-kubernetes-operator-78fc4b8b86-dm7rd    Unable to attach or mount volumes: unmounted volumes=[keystore], unattached volumes=[keystore flink-operator-config-volume flink-operator-token-d7k2c]: timed out waiting for the condition
    12s         Normal    error               helmrelease/flink-kubernetes-operator             Helm install failed: timed out waiting for the condition...
    12s         Normal    error               helmrelease/flink-kubernetes-operator             reconciliation failed: Helm install failed: timed out waiting for the condition
    12s         Normal    error               helmrelease/flink-kubernetes-operator             reconciliation failed: install retries exhausted
  • t

    Tawfik Yasser

    10/31/2022, 8:32 AM
    I'm trying to run a simple flink application, getting this error
  • t

    Tawfik Yasser

    10/31/2022, 8:32 AM
    d
    • 2
    • 3
  • t

    Tawfik Yasser

    10/31/2022, 8:33 AM
    did a lot of search but found nothing...
  • g

    Gaurav Miglani

    10/31/2022, 9:35 AM
    hi Team, we are running jobs using flink kubernetes operator from last 50 days, but somehow today getting bad_certificate error in flink kubernetes operator pod, not able to submit job
    Copy code
    2022-10-31 09:29:17,499 o.a.f.s.n.i.n.c.DefaultChannelPipeline [WARN ] An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
    org.apache.flink.shaded.netty4.io.netty.handler.codec.DecoderException: javax.net.ssl.SSLHandshakeException: Received fatal alert: bad_certificate
    	at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:477)
    	at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
    	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    	at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    	at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    	at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
    	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
    	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
    	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
    	at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    	at java.base/java.lang.Thread.run(Unknown Source)
    Caused by: javax.net.ssl.SSLHandshakeException: Received fatal alert: bad_certificate
    	at java.base/sun.security.ssl.Alert.createSSLException(Unknown Source)
    	at java.base/sun.security.ssl.Alert.createSSLException(Unknown Source)
    	at java.base/sun.security.ssl.TransportContext.fatal(Unknown Source)
    	at java.base/sun.security.ssl.Alert$AlertConsumer.consume(Unknown Source)
    	at java.base/sun.security.ssl.TransportContext.dispatch(Unknown Source)
    	at java.base/sun.security.ssl.SSLTransport.decode(Unknown Source)
    	at java.base/sun.security.ssl.SSLEngineImpl.decode(Unknown Source)
    	at java.base/sun.security.ssl.SSLEngineImpl.readRecord(Unknown Source)
    	at java.base/sun.security.ssl.SSLEngineImpl.unwrap(Unknown Source)
    	at java.base/sun.security.ssl.SSLEngineImpl.unwrap(Unknown Source)
    	at java.base/javax.net.ssl.SSLEngine.unwrap(Unknown Source)
    	at org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler$SslEngineType$3.unwrap(SslHandler.java:296)
    	at org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1342)
    	at org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1235)
    	at org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1284)
    	at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
    	at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
    	... 17 more
    ✅ 1
    m
    • 2
    • 5
  • t

    Tawfik Yasser

    10/31/2022, 11:17 AM
    I got this error, but I can't relate the problem where it might be
    c
    • 2
    • 6
  • a

    Arnon

    10/31/2022, 12:56 PM
    Hello I'm new with Flink tables, trying to create some small POC based on https://nightlies.apache.org/flink/flink-docs-master/docs/try-flink/table_api/#/ I want to query the Flink's table, in this case the transactions table, I in the GUI that its in the default catalog/default DB Using ./bin/sql-client.sh I can connect but can't find this table. What need to be done ? Thanks
    m
    • 2
    • 4
  • p

    Piotr Pawlaczek

    10/31/2022, 1:42 PM
    Hi, I have a problem with using s3 for FlinkSessionJob jarURI. I put s3 properties in my session cluster:
    Copy code
    apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
    kind: FlinkDeployment
    metadata:
      name: my-session-deployment
    spec:
      image: <<my-custom-image-with-s3-plugin>>
      flinkVersion: v1_15
      flinkConfiguration:
        taskmanager.numberOfTaskSlots: "2"
        s3.endpoint: <http://endpoint.com|endpoint.com>
        s3.path.style.access: "true"
        s3.access-key: <<key>>
        s3.secret-key: "****"
    then when I define my FlinkSessionJob:
    Copy code
    apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
    kind: FlinkSessionJob
    metadata:
      name: job
    spec:
      deploymentName: my-session-deployment
      job:
        jarURI: <s3://test/flink-examples-streaming_2.12-1.15.2-TopSpeedWindowing.jar>
        parallelism: 4
        upgradeMode: stateless
    I get the following error from FlinkSessionJob:
    org.apache.hadoop.fs.s3a.AWSClientIOException: doesBucketExist on dummy: com.amazonaws.SdkClientException: Unable to execute HTTP request: test.endpoint.com: Unable to execute HTTP request: test.endpoint.com
    It seems my
    s3.path.style.access: "true"
    property was ignored and the bucket name (test) was attached to endpoint’s name instead of:
    <http://endpoint.com/test/|endpoint.com/test/>
    . Apart from that I’m able to read from s3 connecting with application using exactly the same configuration like in yaml so the s3 config is correct. Could you please tell me what I’m doing wrong?
    s
    j
    t
    • 4
    • 6
  • t

    Tiansu Yu

    10/31/2022, 2:37 PM
    Flink stops dumping output to sink once window function is turned on. Details in 🧵
    c
    • 2
    • 5
  • g

    Gaurav Miglani

    10/31/2022, 3:44 PM
    With flink kubernetes operator, I'm facing issue with flink 1.13 but not flink 1.15, issue is when there is some connector jar or any issue with job manager pod initialisation, in flink 1.15 job manager pod continuously running and load balancer service is up where we can track for the job status that is failed with root exception,we are using load balancer url for monitoring the job status /jobs api,which is expected,also kubectl describe flinkdeployment also show job status = failed which is expected,but when we are running same job with flink 1.13 job manager pod continuously go into crash loop backoff state and load balancer is never up and we are not able to track the job current status,is this expected? Is there any way we can keep job manager pod running in flink 1.13 with load balancer url showing job in failed state with main exception. On describing flink deployment,job status is in reconciliation and after some time we get manual job check is needed issue 🤔
    g
    m
    • 3
    • 14
  • j

    Jin Yi

    10/31/2022, 5:40 PM
    will there still be full/any serialization perf benefits from implementing the Tuple interface (or extending a specific TupleN) when implementing a POJO for flink state and operator i/o? (https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/ and https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html)
  • e

    Eric Xiao

    10/31/2022, 9:15 PM
    Hi, we're exploring the SQL APIs with CDC data and I was getting this error when trying to access to
    op
    column:
    Copy code
    tableEnv
          .sqlQuery("""
            SELECT
                *
            FROM line_item
            WHERE
                true
            AND op <> 'I+'
    """)
    > Column 'op' not found in any table
    Is there a special way for us to access the type of operation of the CDC record in sql directly? I tried added the
    op
    column as a metadata column as well and it complained it could not find it.
    s
    s
    e
    • 4
    • 20
  • a

    Ankit Kothari

    11/01/2022, 12:09 AM
    Hi! I am trying to have some custom metric sent to prometheus and using
    StreamingFileSink
    to write our stream data to s3
    Copy code
    private transient int customMetric = 0;
    final DataStream<String> source = ....
    final StreamingFileSink<String> sink = StreamingFileSink.forRowFormat(new Path(path), new SimpleStringEncoder<String>("UTF-8"))
    				.withBucketAssigner(new NamespaceSiteIdBucketAssigner<>())
    				.withRollingPolicy(OnCheckpointRollingPolicy.build()).withOutputFileConfig(getOutputFileConfig())
    				.build();
    
    sink.getRuntimeContext()
          .getMetricGroup()
          .gauge("MyGauge", new Gauge<Integer>() {
            @Override
            public Integer getValue() {
              return customMetric;
            }
          });
    
    source.map(line -> {
    			Map<String, Object> resultMap = ...
    			customMetric = resultMap.get("some-key");
    			return Utils.jsonParser.writeValueAsString(resultMap);
    		}).name(workflowId).uid(workflowId);
    On running this code snippet, I get the following error
    Copy code
    "The runtime context has not been initialized.","message":"The runtime context has not been initialized.","name":"java.lang.IllegalStateException","extendedStackTrace":"java.lang.IllegalStateException: The runtime context has not been initialized.\n\tat org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
    Is there some initialization to be done so that the runtime context object is created? Can someone please point to what am I missing here and how do I get this working FYI Following this guide for metric: https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/ops/metrics/#metric-types and this for streamingFileSink: https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/streamfile_sink/
    s
    • 2
    • 3
  • m

    Matt Fysh

    11/01/2022, 12:56 AM
    Has anyone tried to get started with pyflink but had to go to java or scala because pyflink was missing features and/or had critical bugs? I think I may have to do this as I am finding myself having to dig through pyflink source code too much to understand bugs & errors. Are there any guides on migrating a pyflink app to be rewritten in java or scala?
    x
    • 2
    • 9
  • s

    Suna Yin

    11/01/2022, 2:31 AM
    I'm trying to use flink-sql to connect Kafka and then write data to Hbase, but the cf column name has to be defined as value from Kafka data dynamically. For example, there has Kafka data as {"tick":"093000","price":3.415}, and I need transfer it to Hbase data like "column=cf0930:00, timestamp=1665365500505, value=3.415". In the transfer, cf column named cf0930:00, which is the value of "tick" in kafka data. And the value of this column equals price in kafka. Is there anyone knows how to meet this requirement?
    s
    • 2
    • 2
  • t

    Tawfik Yasser

    11/01/2022, 3:47 AM
    Where I can find something like documentation for the APIs internals?
    m
    • 2
    • 2
1...282930...98Latest