https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • s

    Sergii Mikhtoniuk

    01/06/2023, 12:36 AM
    I'm upgrading from Flink 0.13 to 0.16 and see that the old trusty
    ParquetRowInputFormat
    is gone... The
    ParquetColumnarRowInputFormat
    gives me
    DataStream[RowData]
    which I struggle a lot with: - I cannot
    stream.print()
    it even, as
    toString
    is not defined for
    RowData
    - I cannot plug it into Table API, as it results in a single column
    f0 RAW(...)
    even though I properly define types and names of each field when constructing
    ParquetColumnarRowInputFormat
    I've seen suggestions to read Parquet via Table API, but in my case I need more low-level control over which files are read and in which order to then combine them into a single stream. Qs: - Is there a correct way to create a table from
    DataStream[RowData]
    ? - If not, can I convert
    DataStream[RowData]
    into
    DataStream[Row]
    (I don't care too much about efficiency atm)?
    s
    • 2
    • 9
  • n

    Nitin Agrawal

    01/06/2023, 3:07 AM
    Hi , As part of DataStream Connectors Hybrid Source implementation is present. But as part of Table API Connectors I can not find the
    Hybrid Source
    implementation. Is it present, if not what is the recommendation to solve
    Hybrid Source
    use case for Table API connectors.
    s
    • 2
    • 2
  • s

    Suparn Lele

    01/06/2023, 4:53 AM
    Hi, I had asked this question previously but did not get any follow up response. How can we run multiple pipelines sequentially in flink batch jobs? Also I read following in docs of 1.15 • Application Mode: runs the cluster exclusively for one application. The job's main method (or client) gets executed on the JobManager. Calling `execute`/`executeAsync` multiple times in an application is supported. What is the meaning of multiple execute/executeAsync supported? Does it mean that in batch job I can use multiple execute to control the flow of my job?
    w
    w
    r
    • 4
    • 62
  • c

    Chen-Che Huang

    01/06/2023, 10:24 AM
    Hi all, We use
    flink-k8s-operator 1.3
    to deploy our Flink apps in k8s cluster. Today, in one of our clusters, we encounter the following error. We expect this issue is fixed in
    flink-k8s-operator 1.2
    because of https://issues.apache.org/jira/browse/FLINK-28272. We try to delete
    webhook-server-cert
    secret and restart the operator pod. But the issue still happens. I found someone encountered the same issue about two months ago and am not sure whether there’re more users having the same issue. Any comment is appreciated 🙏
    Copy code
    one or more objects failed to apply, reason: Internal error occurred: failed calling webhook "<http://validationwebhook.flink.apache.org|validationwebhook.flink.apache.org>": failed to call webhook: Post "<https://flink-operator-webhook-service.flink-operator.svc:443/validate?timeout=10s>": x509: certificate signed by unknown authority (possibly because of "x509: invalid signature: parent certificate cannot sign this kind of certificate" while trying to verify candidate authority certificate "FlinkDeployment Validator")
    k
    g
    g
    • 4
    • 21
  • l

    Lucas Alcântara Freire

    01/06/2023, 12:41 PM
    Hey guys, to monitor our Flink jobs we are using Datadog APM so we can track the latency, how many RPS we had for a job, and so on. does anyone have done something similar? the problem I am having right now is I want to start a span for each event that I have, therefore, having as the first step. If I move the tracing for the first step the finally is called before the sink step, I guess because I do a .keyBy step this happens.
    Copy code
    input.filter(someFiltering())
                    .keyBy(e -> e.getId())
                    .flatMap(new Tracing<>("ResourceName")) // starts the datadog span
                    .flatMap(someWork())
                    .process(someMoreWork())
                    .addSink(saveInDB())
                    .name("cool name");
    Tracing map method
    Copy code
    @Override
        public void flatMap(IN in, Collector<IN> collector) throws Exception {
            final Span span = GlobalTracer.get().buildSpan("job.processing")
                    .withTag(DDTags.RESOURCE_NAME, this.resourceName)
                    .start();
    
            try(final Scope scope = GlobalTracer.get().activateSpan(span)){
                collector.collect(in);
            }  finally {
                span.finish();
            }
        }
  • s

    sumit gulati

    01/06/2023, 12:43 PM
    Hi all, i'm facing issue in the flink pipeline while deploying it to k8s cluster. the error is
    Caused by: org.postgresql.util.PSQLException: The server requested SCRAM-based authentication, but no password was provided.
    I'm sinking the stream to Postgresql using JDBCsink connector, the connector is able to take password from Config.properties file and is working fine in local (pushing data to postgres) but when deploying, its unable to initialise the sink part and causing issue.
  • m

    Mehul Batra

    01/06/2023, 4:39 PM
    Hi All, seems like this recipe is stale or not accessible can we remove it all together or share the correct link in the article. https://docs.immerok.cloud/docs/how-to-guides/development/enrichment-join-with-buffering/
    c
    • 2
    • 1
  • n

    Nick Caballero

    01/06/2023, 5:27 PM
    Hi all, was wondering if there’s anything out there to provide a centralized dashboard for multiple Flink clusters?
  • f

    Fariz Hajiyev

    01/06/2023, 6:37 PM
    Hello, got a question about ingress source on Flink Statefun, Does the statefun app need to be restarted if the number of shards in ingress kinesis stream is increased? Or is it going to detect new shards automatically without restart? (as far as I know, kinesis source operator in apache flink handles that automatically, but I am not sure about statefun)
  • s

    Slackbot

    01/06/2023, 11:42 PM
    This message was deleted.
    s
    • 2
    • 13
  • x

    Xi Cheng

    01/07/2023, 5:26 AM
    hey, we saw a very weird issue with Flink Kafka partition assigner on Flink 1.12 with kafka client 2.4.1, we have a Flink job with 40 TMs and this job is reading from 10 Kafka topics with 90 total partitions. we noticed that even when using
    RoundRobinAssignor
    instead of the default
    RangeAssignor
    , some task managers don't get any partitions assigned while some task managers get a lot (5) partitions assigned, we are not sure what lead to the imbalance of Task manager Kafka partition assignment.
    m
    • 2
    • 2
  • a

    Andy Chambers

    01/07/2023, 5:24 PM
    Continuing this thread here…. https://apache-flink.slack.com/archives/C03GV7L3G2C/p1673103210759869 Is there any guidance offered about what kind of stateful function storage sizes are operationally practical? For the problem I have in mind, I might need up to maybe 20MB of local storage per “partition”. Does that sound like it would be problematically large?
  • r

    Rafael Jeon

    01/08/2023, 11:32 AM
    The following debezium option was used in CDC, but it did not work as intended:
    debezium.snapshot.select.statement.overrides
    =
    'select * from payment where payment_no >=3298000441
    This was used with Flink version 1.15.2 and the library flink-sql-connector-mysql-cdc-2.4-SNAPSHOT.jar. Should I retry another options? I referenced the following link. https://ververica.github.io/flink-cdc-connectors/release-2.1/content/connectors/mysql-cdc.html
    s
    l
    g
    • 4
    • 12
  • a

    Aviv Dozorets

    01/08/2023, 2:01 PM
    question about `JobListener`: Is there a way to trigger it for streaming job when state changes from
    RUNNING
    to anything else, especially if it’s failing ? So far I couldn’t get to trigger
    onJobExecuted
    when it’s cancelled or failed.
    w
    w
    • 3
    • 5
  • e

    Eric Liu

    01/09/2023, 6:45 AM
    Our Flink app constantly got stuck after a few successful savepoint due to below error. Does anyone know what could be the root cause / how to resolve it? We tried increasing the value of
    blob.fetch.num-concurrent
    to 200 from 50 but the error still occurs.
    Copy code
    2023-01-09 06:26:28,780 ERROR org.apache.flink.runtime.blob.BlobServerConnection           [] - Error while executing BLOB connection.
    java.io.IOException: Unknown operation 71
    	at org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:116) [flink-dist-1.15.3.jar:1.15.3]
    m
    k
    • 3
    • 10
  • t

    Tiansu Yu

    01/09/2023, 10:41 AM
    How to create a custom
    RecordWiseFileCompactor
    for parquet files? (flink 1.15)
    I find the documentation https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/filesystem/#compaction on compactions lacks explanation / example on how to create a custom`CompactingFileReader?` . Though there is one example I found https://stackoverflow.com/questions/72716885/use-of-compaction-for-parquet-bulk-format, the solution looks a bit verbose. It seems that you have to implement two interfaces in order to let the
    RecordWiseFileCompactor
    to read a parquet file. Wonder if there is simpler ways to achieve this? E.g. 1. A direct wrapper of
    FileInputFormat
    on top of
    AvroParquetReader
    lying somewhere in Flink? (I remember in 1.13 there was something like
    ParquetAvroInputFormat
    for generic records?) 2. One should be able to supply
    FileInputFormat
    into
    InputFormatBasedReader.Factory
    without wrapping it inside
    SerializableSupplierWithException
    . Is this possible?
  • l

    Luis Calado

    01/09/2023, 11:55 AM
    hey folks 👋 trying to setup table api for iceberg. when creating the table it fails with
    Could not find any factory for identifier 'iceberg' that implements 'org.apache.flink.table.factories.DynamicTableFactory'
    I've added the dependencies
    iceberg-flink-runtime
    ,
    iceberg-flink
    and
    flink-sql-connector-hive
    . Using version 1.14.2
    j
    • 2
    • 5
  • t

    Tawfik Yasser

    01/09/2023, 4:06 PM
    Hello, Can anyone help me with this issue please https://stackoverflow.com/questions/75059985/why-sometimes-with-apache-flink-different-keys-assigned-to-the-same-window TIA
  • m

    mgu

    01/09/2023, 4:11 PM
    Hello! Can you help me with this question about Timers? Is it the correct answer or is there another reason below that that makes flink behave like that? https://stackoverflow.com/questions/74890802/flink-timer-not-triggered/75056281#75056281
  • g

    Gaurav Miglani

    01/09/2023, 5:50 PM
    I have enabled auto-compaction in Streaming Flink table connector as mentioned here, but still receiving parquet files starting with
    .uncompacted-
    , checked flink code of compact operator (https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files[…]/flink/connector/file/table/stream/compact/CompactOperator.java), at checkpoint files are getting committed on s3 with
    .uncompacted-part-uuid
    , I'm trying to create athena table on top of it, but seems like athena ignore files starting with
    .
    (https://aws.amazon.com/premiumsupport/knowledge-center/athena-empty-results/), is there any way i can resolve this, is there any way i can take prefix from config, also after enable compaction still some files names in s3 are
    .uncompacted-part-uuid
    , is this the default behaviour 🤔
  • a

    Ashutosh Joshi

    01/09/2023, 7:19 PM
    I want to write a rule engine for my flink application by implementing KeyedProcessFunction. The only problem that i am getting is how to evaluate expression mentioned in config file like below-
    Copy code
    rules:
              - rule_101:
                  rule.id: rule101
                  is.active: true
                  conditions: "(payment_status == 'payment-initiated' || payment_status == 'payment-pending') && event_time >= 30 minute"
                  actions:
                    - action1:
                        field: status
                        operator: assign
                        value: request_cancel
      
                    - action2:
                        field: is_state_clean
                        operator: assign
                        value: true
    here in condition i have some expression in string format and need help how to evaluate it inside process function
  • m

    Maryam

    01/09/2023, 7:31 PM
    👋 I am trying to create a
    Table Aggregate Function
    udf function that emit value on a window or after a specified number of input elements have arrived. is this possible in Flink Table API?
  • a

    Adriana Beltran

    01/10/2023, 12:47 AM
    Hi, is there not a makeProtobufType function in the Java SDK? I can't seem to find it, while I noticed that Python does feature it as
    make_protobuf_type
    as a function that creates a Statefun type that is backed by Proto.
  • s

    Stephan Helas

    01/10/2023, 9:26 AM
    Hi, quick question. I'm using flink operator 1.2 together with argo-cd 2.5.3. After Deploying a FlinkDeployment, the HEALTH State says "Progressing". I've checked the Argo test here: https://github.com/argoproj/argo-cd/blob/master/resource_customizations/flink.apache.org/FlinkDeployment/testdata/healthy_running.yaml. Everything checks out, except reconciliationStatus.success. The K8s Resource does not have this Key. is it a problem with the operator or witch the argo-cd health-check?
    g
    • 2
    • 10
  • v

    V N Rahul Bharadwaj Tumpala

    01/10/2023, 12:39 PM
    Hi, I am using Flink 1.14.3 in my project. The goal is to access a remote directory via SFTP and monitor it for file additions. After browsing the documentation, I found FileSource to be useful, but how do I tell the Path that it should access a remote dir?
  • a

    Amol Khare

    01/10/2023, 3:05 PM
    Hi team, how come JDBC is available as source with Table API but not with DataStream API (based on documentation for stable release) ?
    s
    m
    • 3
    • 5
  • t

    Tiansu Yu

    01/10/2023, 4:40 PM
    I am having trouble to use
    AvroParquetReader
    who in return uses
    HadoopInputFile
    inside my Flink Application. This is run inside a local Flink-1.15 cluster which has added flink-s3-hadoop-fs inside the plugins folder. This has run normally until I have added a file compactor inside, where I need to use
    AvroParquetReader
    to read Parquet files. This time, it complains that
    java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
    . For hadoop, I have used a locally installed hadoop-2.10, and added its classpath along side the hadoop plugin jar file to HADOOP_CLASSPATH. So I really dont see why hadoop could not find a S3AFileSystem to use.
  • r

    Ruslan Danilin

    01/10/2023, 9:40 PM
    Hi Guys! Does anybody by any chance have a working example of pom.xml for Flink and Iceberg (with Hive catalog) integration? I'm having hard times making it running. Getting errors like Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration even though I'm sure that this class is in my fat jar.
    t
    g
    • 3
    • 10
  • j

    Jeremy Ber

    01/10/2023, 9:45 PM
    having trouble finding any docs on how to define watermark strategies using the Table API. I can of course use SQL syntax and there are lots of docs on this--but how do I define using the
    .watermark(column, expression)?
    • 1
    • 1
  • s

    sharad mishra

    01/10/2023, 9:51 PM
    Hi Team, I’m running my flink(1.16) application on yarn. I want to pass a conf file(influx conf file) to my program, so that I can use it inside my program to parse the details. what is the best way to do this ?
    z
    • 2
    • 4
1...454647...98Latest