https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • r

    Rohan Kumar

    04/03/2023, 6:36 AM
    Hi, I have a question regarding the kubernetes operator. I have added liveness probe in jobmanager and taskmanager podtemplates. But, even when the liveness probe fails the pod don’t restart. In the jobmanager pod events I can see
    Warning  Unhealthy       51s (x139 over 35m)    kubelet  Liveness probe errored: rpc error: code = Unknown desc = deadline exceeded ("DeadlineExceeded"): context deadline exceeded
    . Other pods with the same liveness probe restart when they fail. How to make it work?
    g
    • 2
    • 7
  • g

    Gaurav Gupta

    04/03/2023, 8:40 AM
    Hi All, Need some suggestion on Flink Modules using Stateful functions. I am trying to create to modularize my app to multiple StatefulFunctionEgressStreams; i.e. breaking the universal DAG to business module level smaller DAGs (StatefulFunctionEgressStreams) and chain them by creating DataStream of ingress messages from the egress stream of predecessor DAG (StatefulFunctionEgressStreams). Though I have assigned a unique uid to each ingress data stream, I still see the below error. `Exception in thread "main" java.lang.IllegalArgumentException: Hash collision on user-specified ID "feedback_union_uid1". Most likely cause is a non-unique ID. Please check that all IDs specified via
    uid(String)
    are unique.` Wanted to check if its allowed to create multiple StatefulFunctionEgressStreams in a single flink main function and chain them using Datastreams?
  • s

    Slackbot

    04/03/2023, 11:23 AM
    This message was deleted.
    s
    • 2
    • 1
  • a

    Aditya

    04/03/2023, 12:28 PM
    Does Apache Beam has compatibility with Flink Dynamic tables/ Table APIs. I have gone through beam's code on a high level and it looks like it does not support dynamic tables as of now. Just wanted to confirm if I am missing something or there is any workaround the same?
    m
    • 2
    • 1
  • a

    Abolfazl Ghahremani

    04/03/2023, 1:35 PM
    anyone has month here to talk? after user create an order(invoice) i want create same order(invoice) weekly ...how can implement it in flink as timely job user create new order={product:"p1", time :"now"} then system automaticlly =>next week new order={product:"p1", time :"week2"}=>next week new order={product:"p1", time :"week3"}=>...
    d
    m
    • 3
    • 9
  • f

    Felix Angell

    04/03/2023, 2:39 PM
    hey flink friends, we've recently been moving over to 1.15 pyflink and have noticed that a new operator has been introduced in the graph that seems to have a different parallelism set than expected (see screenshot). that said, in our code we enforce a max parallelism of 20. the chain is something along the lines of:
    Copy code
    env.add_source(
        kafka_consumer,
        ...
    )
    .uid("some id")
    .name("some id")
    .set_parallelism(20)
    .set_max_parallelism(20)
    .assign_timestamps_and_watermarks(watermark)
    .uid("some id")
    .name("some id")
    .set_parallelism(20)
    .set_max_parallelism(20)
    h
    d
    • 3
    • 18
  • a

    Abolfazl Ghahremani

    04/03/2023, 3:03 PM
    scalable cron job?how after user create an order(invoice) i want create same order(invoice) weekly ...how can implement it in flink as timely job user create new order={product:"p1", time :"now"} then system automaticlly =>next week new order={product:"p1", time :"week2"}=>next week new order={product:"p1", time :"week3"}=>...
  • a

    Abolfazl Ghahremani

    04/03/2023, 3:43 PM
    matching engine by flink i want match some data to some data for example new passenger to existing drivers ...or new order to some existing orders... if apache flink can not solve it so why flink exists.... why it is hard programing
  • d

    Demid P

    04/03/2023, 3:52 PM
    Hello, I have a trouble with checkpoints. Usually checkpoint takes less than 15s, but after sometime it cant do a checkpoint because of timeout (30 min). As I can see checkpoint cant be performed in one subtask in a KeyedCoProcessFunction with paralellism 32, for other subtasks checkpoint is done in normal time (less than 15s). What might be a cause? I tried to enable Unaligned Checkpoints, but it doesn't work.
    h
    d
    d
    • 4
    • 4
  • m

    Matthew Kerian

    04/03/2023, 5:42 PM
    Hello team 2 qq on best practices 1. I'd like to run a Flink job that polls/runs pipelines on a custom schedule. I've seen a few SO posts such as this. But I can't find any docs, examples, or blogs showing a more prod-ready solution. Specifically my main concern is related to fault tolerance. E.g. if I want to poll once per minute on time-bound data but our app goes down for 10 minutes I'd want to recover those 10 minutes in their 1 minute intervals. We use Airflow for this currently which seems more naturally suited to the task. But it'd simplify our workflow if we could run this within Flink 2. Is it an anti-pattern to store data longish-term within Flink? My idea is to have a pipeline that reduces daily data to a few metrics and store the last x days of data in a FIFO ds. Once a new day's worth of data is in we drop the oldest day's data and add our new data. I believe this should be possible with a keyed stream and checkpointing but I'm concerned about fault tolerance and if it'd be better to store this in a database and just query the db in order to get the data. Thanks
    m
    • 2
    • 9
  • j

    Jeesmon Jacob

    04/03/2023, 6:44 PM
    Hi team, qq on Application upgrade rollbacks in flink operator. It is marked as
    Experimental
    since v1.0 and still has
    Experimental
    tag in latest version v1.4. But wondering if anyone using it in production and ran into any issues or it is a pretty stable feature 🙂 Thanks!
    g
    • 2
    • 4
  • b

    Brian Lehman

    04/03/2023, 8:14 PM
    CEP question - I have a series of a dozen or so events that works great in detecting "perfect" matches (ie. all dozen events happen in sequence with the proper characteristics). I am trying to add partial matches, using the optional condition to each of the events in the sequence. In my use case, I want to return the "best" match, the one that has the most matching events, possibly even the perfect match. I can accomplish this with noSkip() strategy and some post processing to keep only the best match, but that is super slow at scale (searching for numerous patterns simultaneously through potentially millions of events). I would like to use the skipToLast or skipPastLast strategies, which works great (and incredibly fast) in my perfect match working case. But those skip strategies accept the quickest match (worst match in my use case) and then skip preventing it from even matching on the "best" match. Is there a way (or could there be a way in a future release) to encourage the CEP engine to attempt to find the entire pattern (even with optionals), such that the most complete instance would be returned rather than the first one meeting the least criteria? Thanks!
  • a

    Ali Zia

    04/03/2023, 9:30 PM
    Hi folks, I'm hoping to get some help thinking about a solution to a problem I'm trying to solve. I have a stream of data that gets a bunch of new data every hour. I want to calculate rankings amongst all the values in a given hour once all the data has arrived. For the purpose of this problem we can say that everything arrives in the first 15min. We want to group by a time field as well as some other fields. How can I calculate ranks in this situation? Thanks!!
  • s

    Sumit Nekar

    04/04/2023, 2:59 AM
    Hello Team, I am working on a usecase where data is being aggregated for a given window and then published to a sink. So this may not be a keyed window aggregation and I see windowsAll executes with just 1 parallelism. Any suggestions on achieving non keyed windowing?
    r
    • 2
    • 4
  • g

    Guruguha Marur Sreenivasa

    04/04/2023, 4:20 AM
    Hi team, we have Spotify Flink k8s Operator v0.4.2 running in production and have some concerns over HA. We have multiple flink clusters and are seeing that when a job manager goes down due to an EKS upgrade or some other reason than anything related to the job itself, the entire job is killed including its task managers and the namespace is wiped out. Has anyone faced this issue before or knows how to mitigate/validate this?
    g
    d
    • 3
    • 8
  • k

    kingsathurthi

    04/04/2023, 4:41 AM
    Hi Team, we have two k8s environment openshift and azure k8s, Job manager HA is configured and working fine in openshift environment( PVC access mode is readwritemany) But in Azure k8s Job manager is in pending state (PVC is bounded successfully) below is the event from the pending pod
    Copy code
    Events:
      Type     Reason       Age                  From               Message
      ----     ------       ----                 ----               -------
      Normal   Scheduled    2m22s                default-scheduler  Successfully assigned flink/kmgjobmanager-64d7777b9c-5b8bw to aks-nodepool1-12868363-vmss00001c
      Warning  FailedMount  38s (x7 over 2m12s)  kubelet            MountVolume.MountDevice failed for volume "pvc-daa9e622-da57-4fbe-bc3b-8470e1b3eef8" : rpc error: code = Internal desc = volume(mc_cns-ba-mni-dev-westeurope-rg_e2e-common-qa-01_westeurope#f5bc870483c1d49f186a69c#pvc-daa9e622-da57-4fbe-bc3b-8470e1b3eef8###flink) mount //f5bc870483c1d49f186a69c.file.core.windows.net/pvc-daa9e622-da57-4fbe-bc3b-8470e1b3eef8 on /var/lib/kubelet/plugins/kubernetes.io/csi/file.csi.azure.com/4ecfe2ad9d43dfe5ae52e1eb2bc8bbc22724aeaa08fff4469ecc4306c919fafa/globalmount failed with mount failed: exit status 32
    Mounting command: mount
    Mounting arguments: -t cifs -o mfsymlinks,actimeo=30,nosharesock,file_mode=0777,dir_mode=0777,<masked> //f5bc870483c1d49f186a69c.file.core.windows.net/pvc-daa9e622-da57-4fbe-bc3b-8470e1b3eef8 /var/lib/kubelet/plugins/kubernetes.io/csi/file.csi.azure.com/4ecfe2ad9d43dfe5ae52e1eb2bc8bbc22724aeaa08fff4469ecc4306c919fafa/globalmount
    Output: mount error(2): No such file or directory
    Refer to the mount.cifs(8) manual page (e.g. man mount.cifs) and kernel log messages (dmesg)
      Warning  FailedMount  19s  kubelet  Unable to attach or mount volumes: unmounted volumes=[flink-data], unattached volumes=[flink-config-volume pod-template-volume external-libs kube-api-access-d7n2d logs flink-data]: timed out waiting for the condition
    What could be the reason here
  • a

    Adesh Dsilva

    04/04/2023, 10:03 AM
    Hi We have a kafka consumer on flink that reads messages in avro and deserializes them to java objects. When we don't pass few values (for fields with default values) they are set as null which we then handle. However when I added a
    rebalance
    after the kafka consumer it started throwing null pointer exceptions. I think thats because the default values that come as null are serialized and deserialized again (not sure) and this time avro doesnt like it (since this time key exists but value is null) The real problem however is that I get this null pointer exception only in my local system whereas it works fine on EMR with real Kafka (MSK) Any idea why I am seeing this inconsistency?
  • m

    Miguel Ángel Fernández Fernández

    04/04/2023, 1:49 PM
    Hi team, I would like to update my topologies to flink 1.17, but I have a question regarding cassandra. In maven central I don't see a flink-connector-cassandra version for 1.17, should I use version 3.0.0-1.16?
    d
    • 2
    • 1
  • i

    Ivan Burmistrov

    04/04/2023, 3:14 PM
    Hi team, I have a question which didn't manage to find the answer. Assume we have a Flink SQL job in streaming mode and defined a table on top of Kafka, and after that use this table multiple times in the query. For instance like this:
    Copy code
    SELECT * FROM myKafkaStream WHERE col1 = "a"
    UNION ALL
    SELECT * FROM myKafkaStream WHERE col1 = "b"
    Is it a valid usage? I was under impression that it's not valid because it would effectively move the Kafka pointer twice. However tried it recently and it seems working, so I'm confused a bit
    m
    • 2
    • 5
  • d

    Dheeraj Panangat

    04/04/2023, 5:34 PM
    Hi Team, Following up on this Anyone familiar with this issue ?
    Copy code
    Caused by: java.lang.ClassCastException: class org.apache.flink.table.data.columnar.vector.heap.HeapBytesVector cannot be cast to class org.apache.flink.table.data.columnar.vector.LongColumnVector (org.apache.flink.table.data.columnar.vector.heap.HeapBytesVector and org.apache.flink.table.data.columnar.vector.LongColumnVector are in unnamed module of loader 'app')
    	at org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch.getLong(VectorizedColumnBatch.java:88)
    	at org.apache.flink.table.data.columnar.ColumnarRowData.getLong(ColumnarRowData.java:107)
    	at org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$7(RowData.java:249)
    	at org.apache.flink.table.data.RowData.lambda$createFieldGetter$25774257$1(RowData.java:296)
    	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.toBinaryRow(RowDataSerializer.java:207)
    	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:103)
    	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:48)
    m
    • 2
    • 1
  • v

    Varun Sayal

    04/04/2023, 5:52 PM
    Hi all, sorry if this is the wrong place to ask but where can I find out the process of applying to present at Flink Forward 2023?
  • v

    Varun Sayal

    04/04/2023, 5:53 PM
    My org are very heavy users and we’d love to submit a talk for review
    s
    • 2
    • 3
  • a

    AK

    04/04/2023, 6:23 PM
    Hello, I have a question about the recipe that is available here for the dead letter queue. I see the WatermarkStrategy strategy is not applied immediately from the source. In that case, how does the per partition watermark are supported like this?
  • r

    Radu Stoian

    04/04/2023, 6:30 PM
    Hi all, I am finding that adding the Flink test utils dependency to my project stops the logging in my tests. Has anyone run into this before? Note: I am using Flink 1.15.2, log4j 2.17.1, and sl4j 1.7.32
    m
    • 2
    • 4
  • t

    Tony Piazza

    04/04/2023, 10:39 PM
    We are using Debezium for CDC with MongoDB. The messages are being encoded as JSON and are successfully written to the Kafka topic. We want to create a Flink SQL table with 'connector' = 'kafka' and 'format' = 'debezium-json'. Unfortunately, when we try to query the table, we get the following exception:
    Copy code
    java.io.IOException: Failed to deserialize consumer record
    Full stack trace attached. Please let me know if you have any advice on what we might be doing wrong.
    flink-debezium-stack-trace.txt
    s
    • 2
    • 3
  • s

    Suparn Lele

    04/05/2023, 8:12 AM
    Hi I am using Flink 1.14. When setting properties for KafkaSourceBuilder, I came across this.
    Copy code
    kafkaSourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
    So this committedOffsets are taken from KafkaBroker or ZK directly?
    m
    • 2
    • 25
  • r

    Rashmin Patel

    04/05/2023, 8:59 AM
    Hii Everyone We are doing an upgrade from flink 1.14.2 to 1.16.0, but facing a weird issue where ProcessingTime timer in
    ContinuousProcessingTimeTrigger
    goes into infinite loop (register+fire). Upon further debugging, it seems that this PR ( 👉 https://github.com/apache/flink/pull/17106/files) is causing the problem. Full Description: We are hitting this issue in time windowing operator.
    Copy code
    stream.
            .windowAll(TumblingEventTimeWindowAssigner.of(Time.days(1))
            .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
            .allowedLateness(Time.days(1))
    Let's say today is day
    T
    , then my windowState will also contain pane of
    T-1
    to `T`th day as well (as per allowedLateness = 1 day) As per
    ContinuousProcessingTimeTrigger.registerNextFireTimestamp(. . .)
    in 1.16.0,
    Copy code
    long nextFireTimestamp = Math.min(time + interval, window.maxTimestamp());
    Now for window pane of
    T-1
    to `T`th day
    window.maxTimestamp()
    will be
    T
    and
    time + interval
    will be some
    T + x
    (where T + x < T + 1) So min of both expressions will always be
    T
    , that will become timer.getTimestamp(). Now in
    InternalTimerServiceImpl.onProcessingTime(long time)
    ```while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
    // triggerTarget.onProcessingTime(timer);
    }```
    timer.getTimestamp() =
    T
    time =
    T + x
    So this will never come out of the loop !!! Can someone help me with this on how to proceed further ?
    m
    • 2
    • 10
  • r

    Richard Noble

    04/05/2023, 11:22 AM
    Hi All - I'm trying to run the example table UDF example from https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/python/table/udfs/python_udfs/#table-functions. I get an error:
    Copy code
    org.apache.flink.table.api.TableException: This calc has no useful projection and no filter. It should be removed by CalcRemoveRule.
    d
    • 2
    • 2
  • v

    Vincent Chee

    07/20/2023, 7:56 AM
    Hey Flink community, Im currently exploring async io API in Flink. I notice timeout doesn’t get triggered when i use the flink’s Executors.directExecutor() as recommended here
    Copy code
    package github.jhchee.functions;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.async.ResultFuture;
    import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
    import org.apache.flink.util.concurrent.Executors;
    
    import java.util.Collections;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.Executor;
    //import java.util.concurrent.Executors;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.TimeUnit;
    
    @Slf4j
    public class DelayAsyncFunction extends RichAsyncFunction<Integer, String> {
    
        private transient Executor executor;
    
        @Override
        public void open(Configuration parameters) {
            <http://log.info|log.info>("Initializing executor");
            executor = Executors.directExecutor();
            // executor = Executors.newFixedThreadPool(10); // this works
        }
    
        @Override
        public void asyncInvoke(Integer input, ResultFuture<String> resultFuture) {
            CompletableFuture.supplyAsync(() -> performAsyncOperation(input), executor)
                             .thenAccept(result -> resultFuture.complete(Collections.singletonList(result)));
        }
    
        @Override
        public void timeout(Integer input, ResultFuture<String> resultFuture) {
            String message = String.format("Failed to delay for %d seconds", input);
            resultFuture.complete(Collections.singletonList(message));
        }
    
        private String performAsyncOperation(Integer input) {
            try {
                TimeUnit.SECONDS.sleep(input);
            } catch (InterruptedException ignored) {
    
            }
            // Perform the asynchronous operation and return the result
            return String.format("Delay for %d seconds", input);
        }
    
        @Override
        public void close() {
            ((ExecutorService) executor).shutdown();
        }
    }
    • 1
    • 1
  • a

    amarjeet pasrija

    07/20/2023, 1:39 PM
    Hey we are trying to submit Python Job usiing Kubernetive Flink Operator in Session Cluster But the jobs is failing since it is showing not a valid directory event the directory path is correct as per the image /opt/flink/opt/flink-python-1.17.1.jar Can someone please help out here My Job file looks like apiVersion: flink.apache.org/v1beta1 kind: FlinkSessionJob metadata: name: flink-finance-job spec: deploymentName: flink-session-cluster job: jarURI: /opt/flink/opt/flink-python-1.17.1.jar # Note, this jarURI is actually a placeholder entryClass: “org.apache.flink.client.python.PythonDriver” args: [“-pyclientexec”, “/usr/local/bin/python3”, “-py”, “/opt/flink/usrlib/python_demo.py”] parallelism: 8 upgradeMode: stateless
    m
    • 2
    • 2
1...707172...98Latest