https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • x

    Xiaosheng Wu

    10/18/2022, 4:48 PM
    Hi all, I’m brainstorming what would be the best approach for making multiple async io calls in flink. The requirement is to support multi-clients async calls(for example AWS service client and HttpAsyncClients). I found one flink-async-http-example here and want to know if we need to call two clients for each record, is it ok to just iterate the clients and make calls, waiting for the Future object to come back. Or are there any concerns about this approach and what would be an recommended approach. Thank you !
    h
    a
    • 3
    • 8
  • m

    M Harsha

    10/18/2022, 5:23 PM
    Hi all, I'm trying to setup a real time data aggregation pipeline It consumes messages from kafka, does a time based window aggregate, and dumps data to postgres The following is the job graph, the issue is that the 2nd node is becoming the bottleneck, which causes backpressure(and metrics are lagged), the tasks have a parallelism of 1, which cannot be increased(I assume) How do I approach this problem?
    s
    d
    • 3
    • 4
  • s

    Sachin Saikrishna Manikandan

    10/18/2022, 5:27 PM
    Hello team, a question on using MySQL as a table source: We have an incoming stream of data from Kafka and we want to add some fields from MySQL to enrich the data stream. For this, we are using the Table API to convert the DataStream API and then creating a temporary view and joining that with the table in MySQL via the JDBC connector. At high loads, this becomes very slow and backpressures the entire pipeline. The SQL query that we use is a regular join with the Kafka stream on the left and the MySQL table on the right, with the where condition on the primary key, enforced in both mysql and in the table API. At high throughput of around 35-40K RPS, this query becomes the bottleneck. Are there any suggestions to improve the lookup and is there a way to have some metrics around this JDBC calls? PS: We have enabled lookup caching as well. We are following sth very similar to the first example here https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/data_stream_api/
    m
    • 2
    • 6
  • s

    Stephan Weinwurm

    10/18/2022, 11:42 PM
    Hey all, question about the new v2 HTTP async transport mode in Flink Statefun: https://nightlies.apache.org/flink/flink-statefun-docs-release-3.2/docs/modules/http-endpoint/ Is
    pool_size
    configured per TaskManager or does the JobManager split the
    pool_size
    evenly across all available TaskManagers? We want to limit the number of concurrent connections to our state fun endpoint to e.g. 1000 conntections. Do we set
    pool_size: 1000
    or
    pool_size: 1000 / num_task_managers
    ?
  • m

    Matt Fysh

    10/19/2022, 2:18 AM
    has anyone had success getting the delta connector 0.5.0 working on AWS kinesis analytics? I’m looking into it now, I was hoping to use Python but it looks like I might need to learn some Scala to get things going…
    x
    • 2
    • 4
  • r

    Raghunadh Nittala

    10/19/2022, 2:49 AM
    Hi Team, We are using Flink to process our data streams in production and have few questions regarding the same. • As of now, we are setting the parallelism same as the number of partitions in source Kafka. Apart from this, are there are any specific ways we can adopt for arriving at the proper parallelism value? • We are using Table API for doing a temporal lookup join with MySql table for filtering the stream. Even though we have increased the parallelism for this operator, we’re not able to scale out the MySql calls. Is there any other way to scale out the JDC calls made using Table API?
  • c

    Canope Nerda

    10/19/2022, 3:44 AM
    Hey folks, I'm streaming data from Kafka to Iceberg through Flink SQL in event time semantics. I noticed although the watermark had advanced close to present, but the time attribute field used for watermark in the sink is far lagging behind. Below is how watermark is defined in Flink SQL
    Copy code
    WATERMARK FOR ts AS ts - INTERVAL '0.001' SECOND
    I enabled early/late fire with the following configurations and I expect to see most fresh data in the sink which is not the case. Any suggestion is highly appreciated.
    Copy code
    table.exec.emit.allow-lateness: "24 h"
    table.exec.emit.early-fire.enabled: "true"
    table.exec.emit.early-fire.delay: "0 s"
    table.exec.emit.late-fire.enabled: "true"
    table.exec.emit.late-fire.delay: "0 s"
  • c

    Canope Nerda

    10/19/2022, 3:48 AM
    Also attach the job DAG. 1 hour tumbling window is used for duplication.
  • m

    Matt Fysh

    10/19/2022, 5:16 AM
    I’m probably doing something wrong here, but I can’t seem to get
    env.fromElements("a", "b", "c").print
    to show up in Zeppelin
  • m

    Matteo De Martino

    10/19/2022, 9:17 AM
    Hello all, I am new to Flink and I am looking into it for a PoC at work. I am facing some "operational" issues with submitting a dummy flink job to a session cluster I am running locally via Docker. I am building an uberjar of my Scala program, but when I submit the job (via the Flink Dashboard UI), I get the following:
    Copy code
    System.err: (none)
    2022-10-19T09:03:23.730088107Z 	at org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:264) ~[flink-dist-1.15.2.jar:1.15.2]
    2022-10-19T09:03:23.730092595Z 	at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:172) ~[flink-dist-1.15.2.jar:1.15.2]
    2022-10-19T09:03:23.730096510Z 	at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:82) ~[flink-dist-1.15.2.jar:1.15.2]
    2022-10-19T09:03:23.730101151Z 	at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:159) ~[flink-dist-1.15.2.jar:1.15.2]
    2022-10-19T09:03:23.730105491Z 	at org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler.lambda$handleRequest$1(JarPlanHandler.java:107) ~[flink-dist-1.15.2.jar:1.15.2]
    2022-10-19T09:03:23.730109668Z 	at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) [?:?]
    2022-10-19T09:03:23.730113393Z 	at java.lang.Thread.run(Unknown Source) [?:?]
    2022-10-19T09:03:23.730116960Z Caused by: java.lang.NoSuchMethodError: 'scala.collection.immutable.ArraySeq scala.runtime.ScalaRunTime$.wrapRefArray(java.lang.Object[])'
    2022-10-19T09:03:23.730120758Z 	at generic.WindowFunctions$.<clinit>(WindowFunctions.scala:58) ~[?:?]
    2022-10-19T09:03:23.730124621Z 	at generic.WindowFunctions.main(WindowFunctions.scala) ~[?:?]
    2022-10-19T09:03:23.730128185Z 	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
    2022-10-19T09:03:23.730132380Z 	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
    2022-10-19T09:03:23.730136395Z 	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
    2022-10-19T09:03:23.730140226Z 	at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
    2022-10-19T09:03:23.730143988Z 	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist-1.15.2.jar:1.15.2]
    2022-10-19T09:03:23.730148759Z 	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.15.2.jar:1.15.2]
    2022-10-19T09:03:23.730161755Z 	at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:158) ~[flink-dist-1.15.2.jar:1.15.2]
    2022-10-19T09:03:23.730165936Z
    The Flink version is 1.15.2 (via this docker image), I am packaging a Scala 2.13 job as an uber jar (tried several ways). I also tested the same code locally (in memory Flink) and it worked fine. I am sure I am missing something obvious, but can anyone help me and point me in the right direction? 🙏 Thanks 🙇
    m
    c
    • 3
    • 12
  • y

    Yaroslav Bezruchenko

    10/19/2022, 9:56 AM
    Hey, is it okay to have 2 states per KeyedProcessFunction? Or is it too much?
    a
    • 2
    • 2
  • c

    Canope Nerda

    10/19/2022, 10:29 AM
    Hi team, it seems early/late emit is not supported in window deduplication, do we have ETA to add this feature?
    m
    • 2
    • 15
  • m

    M Harsha

    10/19/2022, 12:54 PM
    Hi all, The following is the configuration for the job submitted via the sql client
    Copy code
    SET 'execution.runtime-mode' = 'streaming';
    SET 'state.checkpoints.dir' = 'file:///tmp/flink-checkpoints/';
    SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
    SET 'execution.time-characteristic'='event-time';
    SET 'execution.checkpointing.interval' = '5min';
    SET 'execution.checkpointing.min-pause' = '1min';
    SET 'execution.checkpointing.max-concurrent-checkpoints' = '1';
    SET 'execution.checkpointing.prefer-checkpoint-for-recovery' = 'true';
    SET 'parallelism.default' = '-1';
    SET 'jobmanager.scheduler' = 'adaptive';
    SET 'cluster.declarative-resource-management.enabled' = 'true';
    The job graph is as follows:
    Copy code
    TableSourceScan > Calc > LocalWindowAggregate > GlobalWindowAggregate > Calc > ConstraintEnforcer > Sink
    When I run the job, I'm seeing that the entire pipeline is running in one slot( checked it via SlotStatus in the logs), and as the load increases I do not see it getting divided and using the other slots Say when I set the default parallelism to 1, and use the default scheculer, this is the job breakdown
    Copy code
    Task1(1 Slot): TableSourceScan > Calc > LocalWindowAggregate 
    Task2(1 Slot): GlobalWindowAggregate > Calc > ConstraintEnforcer > Sink
    Am I missing something which is leading to the job not using the resources properly(adaptive scheduler)
    c
    • 2
    • 11
  • c

    clen.moras

    10/19/2022, 1:01 PM
    hello, i was testing out flinksessionjobs on flink operator version : 1.2.0 it only appears to be working with remote urls and not with local ones:
    Copy code
    apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
    kind: FlinkSessionJob
    metadata:
      name: basic-session
    spec:
      deploymentName: basic-session-deployment
      job:
        jarURI: local:///opt/flink/examples/streaming/TopSpeedWindowing.jar
        parallelism: 4
        upgradeMode: stateless
    Copy code
    apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
    kind: FlinkSessionJob
    metadata:
      name: basic-session
    spec:
      deploymentName: basic-session-deployment
      job:
        jarURI: <https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.15.2/flink-examples-streaming_2.12-1.15.2-TopSpeedWindowing.jar>
        parallelism: 4
        upgradeMode: stateless
    whereas in both case the MANIFEST.MF file is the same in both cases of jars.
    Copy code
    root@ca5227b1db40:/opt/flink/examples/streaming/META-INF# cat MANIFEST.MF
    Manifest-Version: 1.0
    Implementation-Title: Flink : Examples : Streaming
    Implementation-Version: 1.15.2
    Archiver-Version: Plexus Archiver
    Built-By: cranmerd
    Specification-Vendor: The Apache Software Foundation
    Specification-Title: Flink : Examples : Streaming
    Implementation-Vendor-Id: org.apache.flink
    program-class: org.apache.flink.streaming.examples.windowing.TopSpeedW
     indowing
    Implementation-Vendor: The Apache Software Foundation
    Created-By: Apache Maven 3.2.5
    Build-Jdk: 1.8.0_342
    Specification-Version: 1.15.2
    What am i doing wrong here
    j
    g
    • 3
    • 5
  • i

    Ilya Sterin

    10/19/2022, 2:39 PM
    I’m having an issue using rocksdb as a state backend on a Mac M1. Rocksdb now runs natively on M1 and they also have a JNI library which is compatible. But I noticed that Flink uses frocksdb and frocksdbjni. Any reason that’s used vs. the standard rocksdb/jni distribution? Can it work with the standard distro? https://mvnrepository.com/artifact/com.ververica/frocksdbjni hasn’t been updated in over a year, so unsure if there is work on M1 compatibility? Thanks for any help. I’m new to Flink.
    m
    • 2
    • 8
  • m

    Matteo De Martino

    10/19/2022, 2:56 PM
    Apologies in advance for the noob question but: I have a field
    x
    that is
    DATE
    and another field
    y
    that is
    TIMESTAMP
    . What I need is to get build a final
    TIMESTAMP
    (format
    yyyy-MM-dd HH:mm:ss
    ) from
    x
    and only the time part of
    y
    . I tried something like:
    TO_TIMESTAMP(DATE_FORMAT(x, 'yyyy-MM-dd') || ' ' || DATE_FORMAT(y, 'HH:mm:ss'), 'yyyy-MM-dd HH:mm:ss') AS CreationTime
    But
    DATE_FORMAT
    cannot be used on
    x
    (which is
    DATE
    ), and I can't find a way to convert that into a timestamp or a string.... What am I missing? 😅
    m
    • 2
    • 4
  • b

    Bhupendra Yadav

    10/19/2022, 2:58 PM
    Hi everyone. We are evaluating apache/flink-kubernetes-operator for creating a flink session cluster. Is it possible to submit a job to session cluster via REST api /jar/:jar-id/run? Couldn't find anything about it in the doc so asking here. PS: Right now we are using GCP flink-operator which in session mode provides this option to submit a job via REST apis.
  • d

    ding bei

    10/19/2022, 3:08 PM
    hi guys ,when i use join to join two streams, how can i use CONTEXT to get timestamp in apply function? there is a function "ProcessJoinFunction" ,but not fit with this kind of situation.
    a
    • 2
    • 5
  • t

    Tommy May

    10/19/2022, 4:53 PM
    Hello! I'm using the flink k8s operator and flink 1.15.2, and am wondering if it's possible to configure the flink process memory to be lower than the memory allocated to the container. We want to use a memory-backed volume for rocksdb, but this usage counts against the container memory usage. Since Flink thinks it has the full container memory to work with, we're hitting some OOMs when the state size increases. I've tried a variety of config combinations, but haven't gotten anything to work here yet
    g
    • 2
    • 7
  • j

    Jason Politis

    10/19/2022, 5:03 PM
    hello everyone. Is there any reason why flink should be throwing a nullpointerexception for this statement?
    Copy code
    IF(
            COUNTRY IS NULL
            OR CHAR_LENGTH(RTRIM(COUNTRY)) = 0,
            'ETL_UNSPEC_STR',
            RTRIM(COUNTRY)
        )
    m
    • 2
    • 6
  • m

    M Harsha

    10/19/2022, 5:40 PM
    Hi all, Is it possible to create a keyed window aggregate using SQL statement (submitting job via the SQL client) A non-keyed aggregate defaults to parallelism of 1 which seems to be becoming a bottleneck in the pipeline, so was wondering if the stream could be keyed.
    m
    d
    • 3
    • 4
  • m

    Mingliang Liu

    10/19/2022, 6:32 PM
    Hi, I have a small dev/test Flink cluster and I would like to create Tables and make them visible across different sessions / clients. I see I will have to use a fully-fledged Hive catalog. The default_catalog is using GenericInMemoryCatalog. In the doc, it’s saying
    The GenericInMemoryCatalog is an in-memory implementation of a catalog. All objects will be available only for the lifetime of the session.
    I’m wondering why was the decision? Is it possible to make it work across multiple sessions / clients? In-memory is fine (so it’s understood data will be gone after JVM restarts)
    m
    • 2
    • 4
  • d

    ding bei

    10/20/2022, 2:33 AM
    hey guys, when i use join or cogroup to connect two streams, I can not get timestamps ,the only way to do this is to use interval join , why is that
    d
    • 2
    • 4
  • m

    Matt Fysh

    10/20/2022, 6:15 AM
    hi all, I’m running Flink 1.13.2 locally with Zeppelin 0.10, and I copied the flink-s3-fs-hadoop plugin into the plugins directory but I don’t think it’s worked, when trying to use the FileSink I see an error message beginning with “Could not find a file system implementation for scheme ‘s3a’.”
    a
    k
    • 3
    • 5
  • s

    sambhav gupta

    10/20/2022, 7:01 AM
    Hey flink community, is there a way where we can gauge/trace how many records were not written by a sink or potential data loss ?
  • i

    Ildar Almakaev

    10/20/2022, 9:35 AM
    Hello, folk. I’ve got
    ValueState
    and
    MapState
    states:
    Copy code
    private ValueState<User> userState;
    private MapState<String, List<Action>> delayedRecordsState;
    Could you help me figure out how to expose the number of records of each state as a JMX metric? Then I could monitor them in CloudWatch/Grafana, etc. Thanks for any help!
  • c

    chunilal kukreja

    10/20/2022, 10:55 AM
    Hello Team! When i am using OffsetInitializer.timestamp(<timestamp value>) for KafkaSource (1.15.2) as shown here:
    Copy code
    KafkaSource<EventDataMapping> kafkaSource = KafkaSource.<String>builder()
            .setTopicPattern(streamNamePattern)
            .setClientIdPrefix(configProps.getJobProps().getClientId())
            .setStartingOffsets(OffsetsInitializer.timestamp(delta))
            .setDeserializer(kafkaRecordDeserializationSchema)
            //.setValueOnlyDeserializer(new SimpleStringSchema())
            .setProperties(kafkaConsumerProperties)
            .build();
    It throws me the following error: Invalid negative Offset while trying to read from a stream:
    Copy code
    Caused by: java.lang.IllegalArgumentException: Invalid negative offset
    I observed this happens when it tries to return the record from a partition which hasnt recieved any message. So i have 3 partitions, and when events are present in all the three partitions, it works fine, returns all the events in 3 partitions after that particular timestamp. But even if one of the partitions is empty it will throw the error instead of returning the events. How to handle this issue? Ideally it should get an empty response from a partition containing no events instead of error, so that code could look for events in the other partitions. (FYI. I am using Oracle Streaming Service)
    c
    • 2
    • 6
  • j

    Jhuanderson Macias

    10/20/2022, 1:05 PM
    Hey, I am following the documentation here , but when I query the table it returns the data under one column f0. I am struggling to have the result conform to the schema. Any guidance would be appreciated. Thanks.
    m
    • 2
    • 1
  • t

    Tim Bauer

    10/20/2022, 1:45 PM
    In Spark it is possible to define a watermark strategy just before running a windowed aggregation like this:
    Copy code
    val windowedCounts = words
        .withWatermark("timestamp", "10 minutes")
        .groupBy(
            window($"timestamp", "10 minutes", "5 minutes"),
            $"word")
        .count()
    Is there an equivalent approach in Flink? All I am finding on Table API side is that watermarks need to be defined at the moment of table creation. It doesn't look like one could later provide a custom watermark for a custom aggregation or am I wrong? On Datastream API side there seems to exist
    assignTimestampsAndWatermarks
    . Background: I'm dealing with a stream in which only certain events have the event timestamp that I want to use as watermark. I'm filtering out those events using the table API and after I have done so I would like to declare this now guaranteed-to-be-available timestamp field to be my watermark.
    m
    • 2
    • 2
  • v

    Varun Sayal

    10/20/2022, 2:35 PM
    Is there any way to list all the uid’s within a savepoint using the state processor API?
    c
    • 2
    • 10
1...252627...98Latest