https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • r

    Ryan Culbertson

    09/19/2022, 9:14 PM
    Hi! Is it possible to run a job in the sql cli using local execution mode? (i.e. not submit the job to a cluster)
    m
    • 2
    • 3
  • x

    Xiaosheng Wu

    09/19/2022, 9:55 PM
    Hi Flink community, I noticed a very strange behavior with a recent version bump from Flink 1.14.4 to 1.15.2. My project consumes around 30K records per second from a sharded kinesis stream, and during the version upgrade, it will follow the best practice to first trigger a savepoint from the running job, start the new job from the savepoint and then remove the old job. So far so good, and the above logic has been tested multiple times without any issue for 1.14.4. Usually, after the version upgrade, our job will have a few minutes delay for millisecond behind latest, but it will catch up with the speed quickly(within 15mins). Our savepoint is around one hundred MBs big, and our job DAG will become 90 - 100% busy with some backpressure when we redeploy but after 10-20 minutes it goes back to normal. Then the strange thing happened, when I tried to redeploy with 1.15.2 upgrade from a running 1.14.4 job, I can see a savepoint has been created and the new job is running, all the metrics look fine, except suddenly millisecond behind the latest jumps to 10 hours! and it takes days for my application to catch up with the kinesis stream latest record. I don’t understand why it jumps from 0 second to 10+ hours when we restart the new job. The only main change I introduced with version bump is to change failOnError from true to false, but I don’t think this is the root cause. I have one assumption, I tried to redeploy the new 1.15.2 job by changing our parallelism, redeploying a job from 1.15.2 does not introduce a big delay, so I assume the issue above only happens when we bump version from 1.14.4 to 1.15.2? (note the second jump in the screenshot I shared.) Any insights are welcome, thank you.
    d
    m
    • 3
    • 9
  • p

    Pouria Modaresi

    09/19/2022, 11:28 PM
    Hi Everyone does with cluse works in apache flink? receiving this error message : %flink.ssql(type=update) with cte as ( SELECT *,'Asia/Riyadh' as time_zone FROM alkhorayef_cam), cte1 as (select * from cte) select * from cte1 Errore: Invalid Sql statement: with cte as ( SELECT *,'Asia/Riyadh' as time_zone FROM alkhorayef_cam) cte1 as (select * from cte) select * from cte1 The following commands are available: CREATE TABLE Create table under current catalog and database. DROP TABLE Drop table with optional catalog and database. Syntax: 'DROP TABLE [IF EXISTS] <name>;' CREATE VIEW Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>;' DESCRIBE Describes the schema of a table with the given name. DROP VIEW Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>;' EXPLAIN Describes the execution plan of a query or table with the given name. HELP Prints the available commands. INSERT INTO Inserts the results of a SQL SELECT query into a declared table sink. INSERT OVERWRITE Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data. SELECT Executes a SQL SELECT query on the Flink cluster. SET Sets a session configuration property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all properties. SHOW FUNCTIONS Shows all user-defined and built-in functions. SHOW TABLES Shows all registered tables. SOURCE Reads a SQL SELECT query from a file and executes it on the Flink cluster. USE CATALOG Sets the current catalog. The current database is set to the catalog's default one. Experimental! Syntax: 'USE CATALOG <name>;' USE Sets the current default database. Experimental! Syntax: 'USE <name>;' Hint: Make sure that a statement ends with ';' for finalizing (multi-line) statements.
    d
    • 2
    • 1
  • p

    pok liu

    09/20/2022, 7:22 AM
    Hi, team! I installed flink1.15.1 version on ubuntu and run
    SocketWindowWordCount.jar
    . When I try to savepoint, the program throws an exception. Here is the log:
    Copy code
    pokliu@pokliu-ubuntu:/usr/local/flink-1.15.1$ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --hostname 10.10.20.98 --port 9999
    Job has been submitted with JobID 90d81a8e8893720c87c1dc12ad4607df
    
    ^Cpokliu@pokliu-ubuntu:/usr/local/flink-1.15.1./bin/flink stop --savepointPath /tmp/flink-test/ 90d81a8e8893720c87c1dc12ad4607df
    Suspending job "90d81a8e8893720c87c1dc12ad4607df" with a CANONICAL savepoint.
    
    ------------------------------------------------------------
     The program finished with the following exception:
    
    org.apache.flink.util.FlinkException: Could not stop with a savepoint job "90d81a8e8893720c87c1dc12ad4607df".
    	at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:588)
    	at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1026)
    	at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:573)
    	at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1093)
    	at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1156)
    	at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
    	at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1156)
    Caused by: java.util.concurrent.ExecutionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Task has failed.
    	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
    	at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:586)
    	... 6 more
    Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Task has failed.
    	at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
    	at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
    	at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:925)
    	at java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:913)
    	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
    	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:252)
    	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
    	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
    	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
    	at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1387)
    	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
    	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
    	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
    	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
    	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
    	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
    	at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:45)
    	at akka.dispatch.OnComplete.internal(Future.scala:299)
    	at akka.dispatch.OnComplete.internal(Future.scala:297)
    	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
    	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
    	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
    	at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
    	at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
    	at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
    	at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
    	at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
    	at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
    	at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:25)
    	at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
    	at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
    	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
    	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
    	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
    	at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
    	at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
    	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
    	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
    	at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
    	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
    	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
    	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
    	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
    	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
    Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task has failed.
    	at org.apache.flink.runtime.checkpoint.PendingCheckpoint.abort(PendingCheckpoint.java:545)
    	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2070)
    	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1038)
    	at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103)
    	at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    	at java.lang.Thread.run(Thread.java:748)
    Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task name with subtask : Source: Socket Stream -> Flat Map (1/1)#0 Failure reason: Task has failed.
    	at org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1388)
    	at org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1331)
    	at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
    	at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
    	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
    	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:343)
    Caused by: java.util.concurrent.CompletionException: java.net.SocketException: Socket closed
    	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
    	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
    	at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957)
    	at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
    	... 3 more
    Caused by: java.net.SocketException: Socket closed
    	at java.net.SocketInputStream.socketRead0(Native Method)
    	at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
    	at java.net.SocketInputStream.read(SocketInputStream.java:171)
    	at java.net.SocketInputStream.read(SocketInputStream.java:141)
    	at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
    	at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
    	at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
    	at java.io.InputStreamReader.read(InputStreamReader.java:184)
    	at java.io.BufferedReader.read1(BufferedReader.java:210)
    	at java.io.BufferedReader.read(BufferedReader.java:286)
    	at java.io.Reader.read(Reader.java:140)
    	at org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:110)
    	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
    	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
    	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:332)
  • t

    Tong Yue

    09/20/2022, 9:02 AM
    Hi everyone, Which build-in function can be used to split string to array in flink sql? Or need UDF? Split is a very common function at processing string, why flink does not support it?
  • p

    Przemek

    09/20/2022, 10:56 AM
    Hi, a newbie's question - I can implement a source connector via implementing Source interface or with a much simpler RichSourceFunction. Any insights as to when/how I should do it?
  • p

    Pedro Mázala

    09/20/2022, 11:52 AM
    Hey folks. I have HA kubernetes enabled on my flink standalone deployment. Because of that, flink creates a file on my stateful storage on
    default/submittedJobGraph$HASH
    . This file contains the job graph and its parameters. When I want to reload the parameters I’m having to clear files under this directory. Is there a way of storing the submitted job graph and yet reading the parameters on each restart?
  • s

    Satya

    09/20/2022, 11:47 PM
    Hi Everyone, Is there a way to serialize AWSCredentialsProvider within flink custom source or any other way to authenticate flink to access AWS for custom source. I am SQS to read the messages into flink and I am using custom source to build the SQS reader but I am getting error (in comment):
    d
    • 2
    • 4
  • r

    Rommel

    09/20/2022, 11:48 PM
    Hi, I created a job manager yaml, a task manager yaml and a service yaml to deploy flink, and when i deploy the task manager can’t find the resource manager with error msg
    Copy code
    Could not resolve ResourceManager address <akka.tcp://flink@flink-jm-svc-streaming-job:6123/user/rpc/resourcemanager_*>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@flink-jm-svc-streaming-job:6123/user/rpc/resourcemanager_*>.
    g
    • 2
    • 9
  • e

    Echo Lee

    09/21/2022, 2:12 AM
    Hi all, We are trying to optimize TemporalJoin by proposing TemporalState based on https://cwiki.apache.org/confluence/display/FLINK/FLIP-220%3A+Temporal+State At the same time, we extend readRange(long minTimestamp, long maxTimestamp), the internal logic refers to RocksDBMapState#iterator(), and the Entry is cached. Through the benchmark test, readRange is 2 times better than MapState#iterator , we applied this to the TemporalRowTimeJoinOperator, and encountered a confusing problem during the performance test. The performance of TemporalJoin was good at the beginning, but the performance dropped rapidly after running for a period of time. I want to know what is the idea of ​​​​to troubleshoot this problem?
    Copy code
    // TemporalRowTimeJoinOperator    
        private long emitResultAndCleanUpState(long currentWatermark) throws Exception {
    	   final long previousWatermark = Long.MIN_VALUE;
    
            List<RowData> rightRowsSorted =
                    getRightRowSortedBetween(rightRowtimeComparator, previousWatermark, currentWatermark);
    
            Iterator<TemporalValue<List<RowData>>> leftIterator = tLeftState.readRange(previousWatermark, currentWatermark)
                    .iterator();
            List<RowData> orderedLeftRecords = new ArrayList<>();
    
            while (leftIterator.hasNext()) {
                TemporalValue<List<RowData>> entry = leftIterator.next();
                List<RowData> leftRow = entry.getValue();
                orderedLeftRecords.addAll(leftRow);
                leftIterator.remove();
            }
    
            orderedLeftRecords.forEach(leftRow -> {
                long leftTime = getLeftTime(leftRow);
                Optional<RowData> rightRow = latestRightRowToJoin(rightRowsSorted, leftTime);
                if (rightRow.isPresent() && RowDataUtil.isAccumulateMsg(rightRow.get())) {
                    if (joinCondition.apply(leftRow, rightRow.get())) {
                        collectJoinedRow(leftRow, rightRow.get());
                    } else {
                        if (isLeftOuterJoin) {
                            collectJoinedRow(leftRow, rightNullRow);
                        }
                    }
                } else {
                    if (isLeftOuterJoin) {
                        collectJoinedRow(leftRow, rightNullRow);
                    }
                }
            });
    
            orderedLeftRecords.clear();
    
            TemporalValue<Iterable<RowData>> lastUnprocessed = tLeftState.getAtOrAfter(currentWatermark + 1);
            long lastUnprocessedTime = lastUnprocessed == null ? Long.MAX_VALUE : lastUnprocessed.getTimestamp();
            cleanupExpiredVersionInState(currentWatermark, rightRowsSorted);
    
            return lastUnprocessedTime;
    	}
    
        private List<RowData> getRightRowSortedBetween(RowtimeComparator rightRowtimeComparator,
                long beginTimestampInclusive, long endTimestampInclusive) throws IOException {
            List<RowData> rightRows = new ArrayList<>();
    
            for (TemporalValue<RowData> rightRow : tRightState
                    .readRange(beginTimestampInclusive, endTimestampInclusive)) {
                rightRows.add(rightRow.getValue());
            }
            rightRows.sort(rightRowtimeComparator);
            return rightRows;
        }
  • b

    Balazs Varga

    09/21/2022, 7:48 AM
    Hi everyone, I have an SQL job on Flink 1.15.1 which does a tumble window aggregation from a Kafka source:
    Copy code
    CREATE TABLE test_unpartitioned (
      `name` VARCHAR(2147483647),
      `eventTimestamp` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp',
      WATERMARK FOR `eventTimestamp` AS `eventTimestamp` - INTERVAL '3' SECOND
    ) WITH (...)
    
    INSERT INTO test_sink SELECT
          `name`,
          TUMBLE_START(eventTimestamp, INTERVAL '10' SECOND) AS wStart,
          COUNT(*) FROM test_unpartitioned
    GROUP BY  
          TUMBLE(eventTimestamp, INTERVAL '10' SECOND),
          `name`
    I’ve tried running the job with different combinations of
    parallelism
    and
    input topic partitions
    . When • parallelism = 1, input topic has 1 partition: the job works • parallelism > 1, input topic has 1 partition: no results are produced • parallelism = 2, input topic has 2 partitions: the job works • when using proctime instead of event time, the p>1, with 1 partition combination also works Please help me understand why the second case doesn’t produce results. My theory is that each partition from the input kafka topic gets assigned to one of the subtasks of the source operator. Thus when p>1 but there is just 1 partition, only 1 subtask will read messages, only advancing the watermark there. Since the other instances do not get records, their watermarks don’t advance, so at the next operator (the window), it also cannot advance the watermark, because it depends on all of its predecessors. So the window never fires. • Is this reasoning correct? • Is this correct behavior from Flink? Seems a bit unusual, that by increasing the parallelism, we can break a job. Thanks in advance.
    m
    g
    • 3
    • 5
  • o

    Olivier

    09/21/2022, 8:01 AM
    Hi, I'm using FlinkKafkaProducer with integrated avro serialization. Is there a way to do the serialization before kafka production ?
    ✅ 1
    c
    • 2
    • 5
  • a

    Akila Wajirasena

    09/21/2022, 10:32 AM
    Hi Everyone, I using Flink's KeyedProcessFunction to do some custom processing and I also use a value state to save my state. Sample code for the keyed process function and my state class is also given below. I get the following exception when I enable checkpointing. Is there a way to fix this?
    Copy code
    java.lang.IncompatibleClassChangeError: Class scala.collection.mutable.HashMap does not implement the requested interface scala.collection.mutable.SortedMap
    Copy code
    2022-09-21 13:02:56,933 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - KeyedProcess -> Sink: Unnamed (1/1)#6 (ff016f5bbf176977d893cecef4c158bb)
    switched from RUNNING to FAILED with failure cause: java.lang.IncompatibleClassChangeError: Class scala.collection.mutable.HashMap does not implement the requested interface scala.collection.mutable.SortedMap
            at FlinkEventProcessor$Processor.$anonfun$processElement$2(FlinkEventProcessor.scala:1081)
            at FlinkEventProcessor$Processor.processElement(FlinkEventProcessor.scala:1079)
            at FlinkEventProcessor$Processor.processElement(FlinkEventProcessor.scala:941)
            at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
            at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
            at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
            at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
            at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
            at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
            at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
            at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
            at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
           at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
            at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
            at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
            at java.base/java.lang.Thread.run(Thread.java:829)
    Sample code
    Copy code
    case class State(key: String, var lastSequenceNo: Long) {
      val outOfOrder: mutable.SortedMap[Long, KafkaMessage] = mutable.SortedMap.empty[Long, KafkaMessage]
    }
    
    
    class Processor()
      extends KeyedProcessFunction[String, KafkaMessage, Row] {
        private var valueState: ValueState[State] = _
    
        override def open(parameters: Configuration): Unit = {
            valueState = getRuntimeContext.getState(new ValueStateDescriptor("state", classOf[State]))
        }
    
        override def processElement(in: KafkaMessage,
                                ctx: KeyedProcessFunction[String, KafkaMessage, Row]#Context,
                                out: Collector[Row]): Unit = {
    
        logger.debug("Processing out of order messages...")
        while (state.outOfOrder.nonEmpty) {
          logger.debug("Processing out of order message")
          val min = state.outOfOrder.keys.head
          state.outOfOrder.remove(min)
        }
      }
    }
    c
    • 2
    • 4
  • m

    M Harsha

    09/21/2022, 10:41 AM
    Is there any way to log the number of out of order messages dropped from Flink? Wanted to setup some alerting mechanism over this.
    ✅ 1
    c
    • 2
    • 3
  • b

    Breno Jacubovski

    09/21/2022, 12:28 PM
    I’m starting to work with eventTime and stateful operations with Apache Flink. I’m using IntervalJoin and I would like to know if anyone has any documentation on how to perform tests on top of the ProcessJoinFunction function. I saw about testHarness but I ended up getting stuck to understand if it would be with itself or there would be some other way. I thank!
  • j

    Jirawech Siwawut

    09/21/2022, 1:57 PM
    Hi. I am trying to use Window deduplication and group by like this but found some strange issue. Note that i am using event time here SQL
    Copy code
    SELECT
        window_start
        ,window_end
        ,count(1) as cnt
    FROM
    (
        SELECT
        ,id
        ,window_end
        ,window_start
        ,ROW_NUMBER() OVER (PARTITION BY window_start, window_end, order_id ORDER BY event_time DESC) AS rownum
        FROM TABLE(HOP(TABLE mytable, DESCRIPTOR(event_time), INTERVAL '1' MINUTES, INTERVAL '60' MINUTES))
    )A
    WHERE rownum = 1
    GROUP BY window_start, window_end
    The result is quite strange that Flink tried to aggregate the same window again
    Copy code
    +----+---------------------+-------------------------+-------------------------+----------------------+
    | op |        window_start |          window_end |   current_timestamp     |                  cnt |
    +----+---------------------+-------------------------+-------------------------+----------------------+
    | +I | 2022-09-21 12:44:00 | 2022-09-21 13:44:00 | 2022-09-21 13:44:13.010 |                  265 |
    | +I | 2022-09-21 12:45:00 | 2022-09-21 13:45:00 | 2022-09-21 13:45:15.207 |                  360 |
    | +I | 2022-09-21 12:46:00 | 2022-09-21 13:46:00 | 2022-09-21 13:46:12.659 |                 1166 |
    | +I | 2022-09-21 12:47:00 | 2022-09-21 13:47:00 | 2022-09-21 13:47:13.624 |                 1594 |
    | +I | 2022-09-21 12:44:00 | 2022-09-21 13:44:00 | 2022-09-21 13:48:15.907 |                  120 |
    | +I | 2022-09-21 12:45:00 | 2022-09-21 13:45:00 | 2022-09-21 13:49:07.305 |                  348 |
  • m

    Marcos Vinícius

    09/21/2022, 3:36 PM
    Hi! I'm using the exatly-once version of the JDBC Sink with postgres 14. My insert consists of a INSERT INTO ... ON CONFLICT DO UPDATE ....; I'm using the ON CONFLICT clause because I need to aggregate and increment some counters. The first checkpoint finishes on flink web ui, but then there is always a locked insert and the sink hangs forever. My Sink configs:
    Copy code
    JdbcExecutionOptions.builder()
                            .withBatchSize(1000)
                            .withBatchIntervalMs(0)
                            .withMaxRetries(0)
                            .build(),
                    JdbcExactlyOnceOptions.builder()
                            .withTransactionPerConnection(true)
                            .withRecoveredAndRollback(true)
                            .build()
    When I stop my job, there is a hanging statement (the locked one) and a hanging prepared transaction. If I rollback that transaction the hanging statement finishes. Is anyone using the JDBC exactly once sink with postgres and observed similar issues? Flink 1.4 Postgres 14 Java 8 Latest JDBC driver Windows Running all locally
    s
    m
    • 3
    • 19
  • l

    Lukasz Krawiec

    09/22/2022, 12:51 AM
    Hi! I am trying to use KafkaSource to consume from a topic with multiple event types. Schemas are registered in confluent schema registry, and I'd like to deserialize the records into avro's GenericRecord. It seems that this is natively not supported by flink as
    ConfluentRegistryAvroDeserializationSchema.forGeneric
    method requires
    org.apache.avro.Schema
    to be passed in, which in my case is not possible as the actual record's schema will only be known at runtime when deserializing the object and retrieving the schema from SchemaRegistry. Did anyone by any chance run into this use case & can suggest a workaround? Ideally I'd like this to work without having flink do kryo serialization. Thanks for your time
    s
    • 2
    • 4
  • s

    Sergio Sainz

    09/22/2022, 2:03 AM
    Hello, team ! I am investigating how to connect to JDBC sources: sometimes the JDBC sources i want to connect to will use a self-signed certificate and thus want to deploy the self-signed certificate dynamically to flink cluster (maybe cluster will be in k8s, stand alone, or yarn). From instructions here [https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/security/security-ssl/#creating-and-deploying-keystores-and-truststores] it recommends to deploy the certificates to each node, to the container images or to mounted shared folder. But, what if we want to add new certificates without accessing the nodes, or without redeploying the images or without the shared mounted folder? Was exploring whether we can have a user defined function or operator that could help upload the certificate binary to the job through the SQL itself?
  • k

    Krish Narukulla

    09/22/2022, 5:08 AM
    Is there a flink connector for hive with beeline. Flink hive connector pulls many dependencies. https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.12/1.15.2/flink-connector-hive_2.12-1.15.2.pom Is it possible to use hive thin client like beeline?
    m
    • 2
    • 1
  • s

    Sebastian Stiernborg

    09/22/2022, 7:08 AM
    Hello! Is there a reason for why the Standalone Application cluster does not take args from the JobSpec (latest main
    1.2-SNAPSHOT
    of the
    flink-kubernetes-operator
    )?
    g
    u
    • 3
    • 10
  • d

    Dmitry Smirnov

    09/22/2022, 7:45 AM
    Hey, I have a very stupid q. I have a flink app that takes data from Kafka, process the data and send it back to kafka's topic. Now, I have to add additional source, the app should periodically read one file from S3. That file contains some settings/instructions to Kafka's data stream processing. I have this datastream that should read data periodically, but it happens only once or twice, then it stops working w/o any errors.
    Copy code
    job.env
            .readFile(
              inputFormat = new TextInputFormat(new Path()),
              filePath = job.s3Path,
              watchType = FileProcessingMode.PROCESS_CONTINUOUSLY,
              interval = job.s3ReadInterval.toMillis
            )...
    There's also another datastream:
    Copy code
    val regularStream = job.env
      .addSource(kafkaSource)...
    Not sure if both datastreams can be used together in one App.. Any ideas how can this be done? Thanks
  • t

    Tiansu Yu

    09/22/2022, 2:06 PM
    Type information does not get recognized in custom SourceFunction Recently, I have wrote a SQS source connector in order to consume messages from AWS SQS to flink datastream (based on a stackoverflow answer). However, the same POJO class and deserialisation schema is used succesfully for Kinesis but failed for this new SQS Source Connector. (I will post the code in the 🧵) The error message states that
    Copy code
    Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'Custom Source' could not be determined automatically, due to type erasure.
    I wonder why the
    getProducedType
    method in the
    UserDeserializationSchema
    not getting recognized by my custom source connector?
    ✅ 1
    c
    • 2
    • 8
  • f

    Felix Angell

    09/22/2022, 4:34 PM
    Heya, how relevant is this stack overflow post on sessionisation in flink? we're trying to do something very similar in building a session id for a group of events. our impression was that we can use the session windows for this but the answer throws a spanner in the works though i'm curious if it's still the recommended approach as it's 3 years old now + the api has changed quite a bit im sure.
    d
    • 2
    • 6
  • l

    Leo Xiong

    09/22/2022, 7:31 PM
    How many people is using Beam on Flink?
    👎 8
    👍 1
    g
    • 2
    • 2
  • a

    Adrian Chang

    09/22/2022, 8:22 PM
    Hello, Is
    numba
    compatible with PyFlink ? I am having this error
    Copy code
    File "/usr/local/lib/python3.8/site-packages/numba/core/typing/templates.py", line 1278, in register_global
        if getattr(mod, val.__name__) is not val:
    AttributeError: module 'pyflink.fn_execution.beam.beam_sdk_worker_main' has no attribute 'print'
    x
    a
    • 3
    • 4
  • h

    Hannah Hagen

    09/22/2022, 11:02 PM
    Hello, dropping my first question into this channel- thanks in advance for any help! I am looking for some advice on setting up a productive local dev environment for PyFlink. I am on a mac laptop with the new M1 chip. I can't run pyflink locally due to incompatibility with the M1 chip so I have spun up a docker container to emulate the old intel architecture. I am running simple pyflink files within the container via
    python my-flink-script.py
    . However, I find that simple scripts take a painfully long time to run. For example, the following code snippet (taken from the docs) takes ~ 1 minute 4 seconds to run. 😢 (code snippet in thread 🧵 ) Couple questions: • Is the long runtime due to emulating a different architecture? or due to running it in a docker container? or something else? if anyone else can share runtime on their machine that'd help me set my expectations.. • Would it be faster to submit it to a local Flink cluster (instead of running the python file directly)? I assumed not, but just want to double check. • How can I set up a development environment where I can quickly write and run code during development? waiting a minute after writing a line of code is a bit painstaking 😅 thanks for any pointers!
    x
    d
    • 3
    • 12
  • s

    Sumit Nekar

    09/23/2022, 8:09 AM
    Hello, I am using our own custom flink image as base image in to create an image which will be referred in my Flinkdeployment file. When I deploy the FlinkDeployment, i see the following error
    Copy code
    "failed to create containerd task: OCI runtime create failed: container_linux.go:380: starting container process caused: exec: "/docker-entrypoint.sh": stat /docker-entrypoint.sh: no such file or directory: unknown"
    Docker file for my image:
    Copy code
    FROM private-repo/flink-base:1.13
    RUN mkdir /opt/flink/usrlib
    ADD my-application/target/my-application.jar /opt/flink/usrlib/my-application.jar
    But flink base image (flink-base:1.13) has defined entrypoint like this
    Copy code
    ENTRYPOINT ["/entry-point.sh"]
    So I tried overriding using command in my FlinkDeployment file.
    Copy code
    containers:
            - name: flink-main-container
              command: ["/entry-point.sh"]
    I still see the same error. Am I missing something?
    g
    m
    y
    • 4
    • 10
  • a

    Adesh Dsilva

    09/23/2022, 11:09 AM
    Is there any plan to add `.addGroup("MyMetrics")`to Counter interface? Or a way to specify groups while calling
    .inc()
    Currently we have to create metric counters in open() and if we have 10-15 groups with different combinations then we end up creating 50-60 metric counter objects in open(). This is not very ideal even if we create some dynamic method to create all the counters in open() in some map. Clients like datadog provide a convenient method to pass tags when you call
    inc()
    . Is it possible to add this in flink as well?
    c
    • 2
    • 5
  • s

    Slackbot

    09/23/2022, 1:37 PM
    This message was deleted.
    c
    • 2
    • 2
1...192021...98Latest