https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • r

    raghav tandon

    04/23/2023, 7:18 AM
    I am facing a weird issue and not able to solve it or get any clue, any pointers would be really helpful… This is the transformation I am trying to do..
    DataStream[Event] -> Window -> ProcessWindowFunction -> Datastream[Event2] -> Kafka Sink
    Everything works fine but i am seeing in kafka some value is getting shuffled, i.e some other value is coming which is not intended. I have a log before
    out.collect()
    and things are getting printed correctly in that log line.
    Copy code
    val userIds = events
          .keyBy(e => e.client)
          .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
          .trigger(
            PurgingTrigger.of(ProcessingTimeTrigger.create())
          ) 
          .process(userIdFetcher)
          .setParallelism(1)
          .uid(userIdFetcher.getClass().getSimpleName())
    Copy code
    final class UserIdFetcher(config: UserIdFetcherConfig, failedOutputTag: OutputTag[ReadErrorEvent])
      extends ProcessWindowFunction[WriteSuccessEvent, Diff, String, TimeWindow]
        with Serializable{
      
      elements.groupBy[String](update => update.command.id.get).foreach {
          case (id, updates) =>
           
            retryableProcess(
              id,
              context,
              MongoBulkQuery(
                key,
                "User",
                updates.map(update => update.command.body.identifiers).toList,
                List(UserFields.Id)
              ),
              out
            )
        }
      
    
    
      def retryableProcess(
                            id: String,
                            context: Context,
                            query: MongoBulkQuery,
                            out: Collector[Diff],
                            attemptNumber: Int = 0
                          ): Unit = {
        
        mongoClient.client(query.client, query.collection).findUserIDs(query).onComplete {
          case Success(values) =>
           
            values
              .map(user => user.getObjectId(MongoFields.Id).toHexString())
              .grouped(config.maxBatchSize)
              .foreach(userIdBatch => {
                val diff = Diff(query.client, id, userIdBatch.toList)
                out.collect(diff)
              })
        }
      }
    }
    
    final case class Diff(
      @JsonProperty("dbName") client: String,
      @JsonProperty("id") collection: String,
      @JsonProperty("add") add: List[String]
    )
    But in Kafka this value is getting changed randomly of a different
    id
    which is also getting processed concurrently.
    s
    a
    • 3
    • 20
  • m

    Mali

    04/23/2023, 9:09 AM
    Hello everyone, I am trying to read data from kafka topic with flink sql. But, when i create table with raw format i am seeing the row like this;
    Copy code
    �������`�[{"testId"... |
    What should i do to remove these special characters ? Many thanks. Note;“I am getting “Failed to deserialize JSON ’�������`�” error” when i tried to read with json format
  • s

    Slackbot

    04/24/2023, 6:04 AM
    This message was deleted.
    h
    • 2
    • 3
  • a

    Amir Hossein Sharifzadeh

    04/24/2023, 2:00 PM
    Does anybody know how to delete messages from topics after consuming them? My source if Kafka.
    d
    j
    • 3
    • 2
  • s

    Slackbot

    04/24/2023, 4:27 PM
    This message was deleted.
    r
    • 2
    • 4
  • z

    Zachary Schmerber

    04/24/2023, 5:44 PM
    Hello I am trying to find the best way to rename Json Key fields and leave the Values the same. I have already turned the datastream into an ObjectNode. What is the best way to accomplish the renaming of the Key(log field name) at this point?
    Copy code
    // log source data from above Kafka Topic using JsonDeserializationSchema//
    
            DataStream<ObjectNode> filteredStream = env.fromSource(logSource, WatermarkStrategy.noWatermarks(), "Kafka winlog Source");
    
            DataStream<ObjectNode> winObjectStream = filteredStream.filter(node -> node.get("service_id").asText().equals("cdd31c3f-3b24-4526-848a-xxxxxxxxxxx"));
    
    
       //Placeholder for logically mapping the new Object Stream 
    
    // Datastream that has sucessfully been transformed into Objects 
            winObjectStream.sinkTo(winEventSink);
    d
    • 2
    • 7
  • x

    Xavier

    04/24/2023, 6:31 PM
    Hello everyone, I have a flink job with two sources, a File source and a Kafka source. Is there a way of registering a task listener (not the whole job, only a task) that gets notified when the File source is FINISHED?, so that I can run an archival process (archive the file). Thanks
  • j

    Jeesmon Jacob

    04/24/2023, 6:49 PM
    Hi team, we are using
    slotmanager.redundant-taskmanager-num: "1"
    to run a redundant TM to speed up recovery. But what we have seen is flink is keep trying to schedule a new TM pod until redundant TM is ready every few seconds. So for some time we are seeing many extra TMs in Pending state and it is triggering unnecessary node autoscaling. Wondering if there is a configured interval for flink to detect if redundant TM is in Ready state. We are using flink operator for deployment.
    • 1
    • 1
  • n

    Nathanael England

    04/24/2023, 7:50 PM
    Is it possible to point to a directory of wheels with
    -pyFiles
    . The language here isn't too clear what is expected to happen. I'm getting import failures when I point to the directory that contains all my wheels.
    d
    • 2
    • 3
  • n

    Nathanael England

    04/24/2023, 8:27 PM
    Okay, here's a better question after resolving my user error above. How can I get flink to respect the
    .so
    included in the path? I can see from logging my dependencies contents that the so is making it to the job.
    Copy code
    ERROR:root:/tmp/pyflink/8d6f1590-3459-4e27-b118-2cf001999248/c6e62fed-b841-4a37-be6e-2001db67f477/usrlib/.deps/google/protobuf/pyext/cpp_message.py
    ERROR:root:/tmp/pyflink/8d6f1590-3459-4e27-b118-2cf001999248/c6e62fed-b841-4a37-be6e-2001db67f477/usrlib/.deps/google/protobuf/pyext/_message.cpython-38-x86_64-linux-gnu.so
    ERROR:root:/tmp/pyflink/8d6f1590-3459-4e27-b118-2cf001999248/c6e62fed-b841-4a37-be6e-2001db67f477/usrlib/.deps/google/protobuf/pyext/__init__.py
    The error I get is
    Copy code
    ImportError: cannot import name '_message' from 'google.protobuf.pyext' (/tmp/pyflink/8d6f1590-3459-4e27-b118-2cf001999248/8541a21e-7834-425d-9a53-63350409a2a5/.deps/google/protobuf/pyext/__init__.py)
    • 1
    • 1
  • m

    Matthew Kerian

    04/25/2023, 12:10 AM
    Hi team, how do I download an older version of Flink (1.15). Homebrew doesn't have it and neither does the download page
    s
    • 2
    • 1
  • a

    Amenreet Singh Sodhi

    04/25/2023, 4:48 AM
    Hey Team! I am deploying my flink job in application mode, But now recently I have started seeing the following error, earlier it was not there. Now every time I re deploy JM, this appears and JM restarts:
    Copy code
    023-04-23 18:13:29,093 ERROR org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler [] - Caught exception
    java.io.IOException: Connection reset by peer
    	at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[?:?]
    	at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[?:?]
    	at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:276) ~[?:?]
    	at sun.nio.ch.IOUtil.read(IOUtil.java:233) ~[?:?]
    	at sun.nio.ch.IOUtil.read(IOUtil.java:223) ~[?:?]
    	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:358) ~[?:?]
    	at org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253) ~[event_executor-1.0-SNAPSHOT.jar:?]
    	at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132) ~[event_executor-1.0-SNAPSHOT.jar:?]
    	at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350) ~[event_executor-1.0-SNAPSHOT.jar:?]
    	at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151) [event_executor-1.0-SNAPSHOT.jar:?]
    	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) [event_executor-1.0-SNAPSHOT.jar:?]
    	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) [event_executor-1.0-SNAPSHOT.jar:?]
    	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) [event_executor-1.0-SNAPSHOT.jar:?]
    	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) [event_executor-1.0-SNAPSHOT.jar:?]
    	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) [event_executor-1.0-SNAPSHOT.jar:?]
    	at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [event_executor-1.0-SNAPSHOT.jar:?]
    2023-04-23 18:13:06,279 ERROR org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler [] - Error while responding to the http request.
    java.text.ParseException: Unparseable date: "${jndi:<ldap://127.0.0.1#log4shell-generic-dXyfRkM647dI4bcPETOo.w.nessus.org/nessus>}"
    	at java.text.DateFormat.parse(DateFormat.java:395) ~[?:?]
    	at org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler.respondToRequest(StaticFileServerHandler.java:210) ~[event_executor-1.0-SNAPSHOT.jar:?]
    	at ....
    Any idea whats the possible issue is? Thanks
  • g

    Gaurav Gupta

    04/25/2023, 8:16 AM
    Hi Team, I am new to apache flink and trying to use flink with spring boot java. I want to inject some of my business specific classes in custom implementation of process function. Can anyone suggest me the way forward.
    a
    k
    • 3
    • 12
  • k

    Krzysztof Chmielewski

    04/25/2023, 12:34 PM
    Hi all, I'm having some problems with setting up Flink with Hive Catalog. Im facing a LinkageError while submitting my SQL job (FLink 1.16.1) That tries to write to S3. The problem Does not occur when I'm trying to write into local path. I've created the HADOOP_CLASSPATH env variable that points to hadoop 3.1.3 / This is the exception:
    Copy code
    Caused by: java.lang.LinkageError: loader constraint violation: when resolving method 'void org.apache.hadoop.util.SemaphoredDelegatingExecutor.(com.google.common.util.concurrent.ListeningExecutorService, int, boolean)' the class loader org.apache.flink.util.ChildFirstClassLoader @2486925f of the current class, org/apache/hadoop/fs/s3a/S3AFileSystem, and the class loader 'app' for the method's defining class, org/apache/hadoop/util/SemaphoredDelegatingExecutor, have different Class objects for the type com/google/common/util/concurrent/ListeningExecutorService used in the signature (org.apache.hadoop.fs.s3a.S3AFileSystem is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @2486925f, parent loader 'app'; org.apache.hadoop.util.SemaphoredDelegatingExecutor is in unnamed module of loader 'app')
    	at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:769)
    	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1118)
    	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1098)
    	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:987)
    	at io.delta.storage.S3SingleDriverLogStore.write(S3SingleDriverLogStore.java:299)
    Any idea what can be causing this problem? The same job is workign fine when trying to write into local path, so Im guessing that somethign with s3 plugin, some dependency issue. Other job, that does not use Hive catalog can write to S3 on the same cluster.
    m
    • 2
    • 18
  • v

    Viktor Hrtanek

    04/25/2023, 12:36 PM
    Hi all, has anyone successfully run apache beam application in python (utilising cross-language pipeline) on a dockerized flink cluster?
    o
    s
    • 3
    • 7
  • a

    Aurora Nockert

    04/25/2023, 12:41 PM
    Not exactly troubleshooting, but we’re accessing Kafka via Flink SQL, and I’m wondering what is the best way to handle the
    WITH('properties.sasl.jaas.config'='…')
    part, it would be nice if the query didn’t have to include the password. Is there some way to read it from an env variable, Java
    -D
    properties, or a secret store?
  • a

    Aurora Nockert

    04/25/2023, 12:42 PM
    I cannot find any articles on the subject really ._.
  • s

    Sergey Postument

    04/25/2023, 1:35 PM
    Hi all. I use
    KafkaSink
    to save my events in kafka, but during big load I getting flink app restart due to
    Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 6520160 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
    what is pretty strange, since my messages I save via kafkaSink are json string with 2k char length maximum. not `6520160`bytes (6.5Mb). Pls point me to some documentation, what am I missing here? p.s. flink 1.14.6
    o
    • 2
    • 10
  • m

    Mali

    04/25/2023, 2:17 PM
    Hello everyone, I am pretty new in Flink SQL, How can i explode json like this using Flink SQL;
    [{"abc":"123", "cde":"456"},{"efg":"789", "abc":"123"}]
    s
    • 2
    • 2
  • j

    Johannes Cox

    04/25/2023, 2:51 PM
    Hi everyone, we’re experiencing a strange phenomenon when using the table API and converting a table to a data stream with `toDataStream()`: the watermarks that we had assigned in the
    CREATE TABLE
    statement seem to disappear and we account the following error:
    Copy code
    java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
    	at org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows.assignWindows(SlidingEventTimeWindows.java:80) ~[flink-streaming-java-1.16.1.jar:1.16.1]
    	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:284) ~[flink-streaming-java-1.16.1.jar:1.16.1]
    ...
    We are currently fixing it by assigning timestamps a second time, after the conversion, but feel like this shouldn’t be necessary. Are we misunderstanding how watermarks work (especially in regard to mixing Table and Datastream API) or did we possibly encounter a bug? Is reassigning watermarks a valid approach in general? To give more context, we’re using Flink 1.16.1 with the Java API and consume a Kinesis stream (although consuming from a file source yields the same behaviour). In the job we have two aggregations that use different
    SlidingEventTimeWindows
    and an
    intervalJoin
    to join the results.
    ❤️ 1
    i
    • 2
    • 3
  • c

    Christophe Bornet

    04/25/2023, 4:57 PM
    Hi all, in a k8s cloud environment, would you recommend using local SSDs for RocksDB or can EBS backed by SSD be used ?
    s
    o
    • 3
    • 3
  • t

    Trystan

    04/25/2023, 4:58 PM
    am i correct in thinking the current dynamo sink doesn’t provide a way to do a conditional write? considering it does batchWrites under the hood, and those don’t support conditions. poked around the different ways of constructing the dynamo item / sink but haven’t found a way yet. just wanted to see if anyone had a clever solution short of writing our own sink for these cases (which is what we currently do)
    d
    • 2
    • 2
  • n

    Nathanael England

    04/25/2023, 5:31 PM
    How would one configure the rest endpoint of a flink job deployed with the k8s operator to be a
    NodePort
    ? I can manually patch things after startup and access the web ui, but I'd like the web ui to be available on a consistent port. The only mention I see of
    NodePort
    in the helm setup is for role based access control.
  • a

    Amir Hossein Sharifzadeh

    04/25/2023, 6:30 PM
    I am processing messages from DataStream and pass them to the Process Function:
    Table raw_table = tableEnv.sqlQuery(data_query);
    DataStream<Row> join_stream = tableEnv.toDataStream(raw_table);
    SingleOutputStreamOperator<List<double[][][]>> outputStreamOperator = join_stream.process(new EMPADProcessor(commands, taskENT.getTaskId()));
    where
    public void processElement(Row row, ProcessFunction<Row, List<double[][][]>>.Context context, Collector<List<double[][][]>> collector) throws Exception {
    inside
    processElement
    I am doing some data analysis which takes a few moments for every single message (row) and it this short processing time, causes duplicated messages and I don’t know why?
  • r

    Rion Williams

    04/25/2023, 8:06 PM
    Quick question that’s tangentially related to Flink that I thought I’d pose to the audience here as there’s likely someone in the audience that has dealt with it previously. I’m currently trying to configure a Prometheus-based alert within Grafana to monitor Kafka consumer lag (
    kafka_consumergroup_lag
    ) for one of my Flink jobs to determine when something upstream may have failed. This particular job doesn’t commit offsets at periodic intervals and the point of degradation isn’t a static value that might apply across all environments (so I can’t simply use something like
    kafka_consumergroup_lag{topic="topic-for-flink"} > $someThreshold
    ). Does anyone have any ideas on this front? I’m basically just trying to set up an alert if my consumer lag for this particular job purely increases for an hour. I’ve tried a few different approaches involving derivatives, etc. but haven’t had much luck thus far.
  • k

    kingsathurthi

    04/26/2023, 5:42 AM
    When I run the istioctl analyze command getting below info, how to make this istio complaint
    Copy code
    Warning [IST0117] (Deployment f0/flink-kubernetes-operator) No service associated with this deployment. Service mesh deployments must be associated with a service.
    Info [IST0118] (Service f0/pod1-ingester) Port name ingester (port: 8081, targetPort: 8081) doesn't follow the naming convention of Istio port.
    Info [IST0118] (Service f0/pod1-rest) Port name rest (port: 8081, targetPort: 8081) doesn't follow the naming convention of Istio port.
    Info [IST0118] (Service f0/pod2-ingester) Port name ingester (port: 8081, targetPort: 8081) doesn't follow the naming convention of Istio port.
    Info [IST0118] (Service f0/pod2-rest) Port name rest (port: 8081, targetPort: 8081) doesn't follow the naming convention of Istio port.
  • k

    Kevin T

    04/26/2023, 5:59 AM
    Hi all, i've been trying to setup Flink development environment to fix some bugs that I have discovered following
    <https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/ide_setup/>
    instructions. but on step 7, when i executed
    mvn clean package -DskipTests
    i'm getting
    Copy code
    [ERROR] Failed to execute goal net.alchim31.maven:scala-maven-plugin:3.2.2:testCompile (scala-test-compile) on project flink-scala_2.12: wrap: org.apache.commons.exec.ExecuteException: Process exited with an error: 240 (Exit value: 240) -> [Help 1]
    i have Scala 2.12.17, JDK 11, Maven 3.9.0 installed How do I fix this issue?
    m
    • 2
    • 5
  • c

    chankyeong won

    04/26/2023, 7:29 AM
    How can I use my own custom sink connector based on AsyncSinkBase? I implemented components referenced this document, How to use
    MySink
    class on the DataStream in my flink application’s main method?
    a
    • 2
    • 15
  • m

    Marcus Baule

    04/26/2023, 9:00 AM
    Hi all, i'm pretty new to Flink and try to use pyflink data stream api. I want to connect to Confluent Cloud and read an avro topic with schema maintained in confluent schema registry. Now i face some issues deserializing the message and i was not able to fully understand if the Data Stream API supports Confluent Schema Registry avro messages or do i need to use the Table API for that which has a Confluent Avro Format? Schema string is what i receive properly from confluent cloud but when i use the schema string from confluent for deserialization
    Copy code
    deserialization_schema = AvroRowDeserializationSchema(
            avro_schema_string=schema_str_source
        )
    i receive an error when executing
    Copy code
    org.apache.flink.avro.shaded.org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -1
    Thanks and cheecrs, marcus
  • s

    Slackbot

    04/26/2023, 9:45 AM
    This message was deleted.
    d
    • 2
    • 1
1...757677...98Latest