https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • a

    Aqib Mehmood

    06/13/2022, 8:24 AM
    Hi All, We're trying to ensure stateful resumption of flink-kinesis jobs in case of failure. Currently we're looking at two approaches. • Using the At_Timestamp option to start the job and passing the timestamp from a .properties file which is updated at each GetRecords iteration. Is this the correct way to ensure stateful resumption of kinesis jobs on flink? Please see below.
    Copy code
    FileInputStream ip = new FileInputStream(<.properties file location>);
    Reader ip = new FileReader(file); 
    Properties prop = new Properties();
    prop.load(ip);;
    String timeStart = prop.getProperty("timeStamp");
    consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "AT_TIMESTAMP");
    consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP, timeStart);
    • Secondly we're exploring the checkpointing option provided by flink to checkpoint progress at regular intervals. The issue with this approach is that we're running flink in application mode and thus have separate job manager for each job. Wouldn't the checkpointing data be lost if a job were to fail, bringing down the JM which is storing the checkpointing data? How can we use this checkpointing data to start another kinesis job in case of failure? Is there another way of ensuring stateful resumption of flink-kinesis jobs that we're missing out? Would love to know your thoughts. TIA
    h
    • 2
    • 3
  • v

    Veeramani Moorthy

    06/13/2022, 10:15 AM
    I have created a small Flink program using table API. I would like to unit test them by passing couple of tables as input & generate an output table & then compare the output table with expected output table. To unit test this, I need to create these tables in memory. What are the best way to create these tables in memory? I tried with datagen connector. After creating a table with datagen connector, I am not able to insert records into it. I tried with print connector as well, here I am not able to read the table records. I should be able to do both read & write operation in memory table. what is the best way to create these tables?
    m
    s
    • 3
    • 6
  • m

    Márk Bartos

    06/13/2022, 1:00 PM
    Hey. I'd like to ask for help regarding this issue: https://lists.apache.org/thread/zbk9qc00sqwf5xnfdtssrb7wntg8dp0c | Exception: class java.sql.Timestamp cannot be cast to class java.time.LocalDateTime | pyflink 1.15.0
    d
    • 2
    • 1
  • i

    Ildar Almakaev

    06/13/2022, 2:56 PM
    Hello, community. Is there any way to restore a ValueState if I use the default
    state.backend=hashmap
    ?
    y
    m
    • 3
    • 10
  • j

    John Gerassimou

    06/13/2022, 4:59 PM
    We have pipelines written in Beam, which are currently deployed to Dataflow. We've been working on a POC to determine the feasibility of running the pipelines with the Flink K8s Operator. The outcome of this is hopefully more reliability and cost savings. When running the pipelines on Flink, we notice lower throughputs. We tried to do some random tuning, such as adjusting parallelism, slots, etc. and didn't see significant improvements. Are there heuristics/best practices for tuning the cluster?
    👀 1
    m
    g
    k
    • 4
    • 28
  • e

    Erik Wickstrom

    06/14/2022, 4:05 AM
    Is it possible to take advantage of async i/o in Flink from Python? I need to perform a streaming ETL task and enrich some records with a dataset in Cassandra. (https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/operators/asyncio/)
    👀 1
    r
    • 2
    • 1
  • b

    Bill Rao

    06/14/2022, 7:41 AM
    Hi, everyone. I have encountered an error that has left me clueless for two days. When I deploy pyflink over native Kubernetes, an error (
    Could not find a free permitted port on the machine.
    ) is thrown as following:
    Caused by: java.lang.RuntimeException: Could not find a free permitted port on the machine.
    at org.apache.flink.util.NetUtils.getAvailablePort(NetUtils.java:177) ~[flink-dist-1.15.0.jar:1.15.0]
    at org.apache.flink.client.python.PythonEnvUtils.lambda$startGatewayServer$3(PythonEnvUtils.java:365) ~[?:?]
    at java.lang.Thread.run(Unknown Source) ~[?:?]
    2022-06-14 07:25:28,086 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting KubernetesApplicationClusterEntrypoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
    2022-06-14 07:25:28,089 INFO  org.apache.flink.runtime.blob.BlobServer                     [] - Stopped BLOB server at 0.0.0.0:6124
    2022-06-14 07:25:28,092 INFO  org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting down rest endpoint.
    2022-06-14 07:25:28,572 WARN  akka.actor.CoordinatedShutdown                               [] - Could not addJvmShutdownHook, due to: Shutdown in progress
    2022-06-14 07:25:28,573 WARN  akka.actor.CoordinatedShutdown                               [] - Could not addJvmShutdownHook, due to: Shutdown in progress
    Could anyone offer some clues? I reviewed the source code and found nothing useful. There is an IOException silenced in NetUtils, but otherwise, I cannot see why an error like this is relevant when starting pyflink. Have been suspecting privilege issues, but a minimal example of pyflink (word_count.py) runs with no abnormality.
    s
    • 2
    • 2
  • l

    Liubov

    06/14/2022, 10:00 AM
    Hi all. I have a question, which is "is it even possible" type of a question. Had it on stack overflow but still no luck there, so maybe you'll be able to help. Here is the question from stack overflow In our application we would like to use Apache Flink to process user events, enrich them with different sink types (each user can have several sink types) and then sink these events to corresponding different types of sinks. Say we may have 2 types of sinks(for example file and http) and each user wants to use one or both of these types, with his/her own http endpoint/file name for corresponding sink. Each event after enrichment knows if it should be sent to single or both sink types and exact endpoint/ filename. The problem is that some endpoints may become slower and as the result the entire stream, with all events for all users and sink types will be slowed down by backpressure. So, the question. Is there a way to, while still keeping reasonable backpressure, process events for different sinks and endpoints independently, so that slow endpoint of one user don't affect the entire stream and other users. Basically I can think of smth like very smart custom sink that can handle multiple endpoint or say track and drop slow events. Or maybe some intermediate Kafka with topic per user and partition per sink type. But that all looks a bit clumsy. Maybe there's a better solution with Flink capabilities.
    r
    k
    g
    • 4
    • 7
  • t

    Tom Xiao

    06/14/2022, 11:08 AM
    Hello guys, any idea how should I use JSON_ARRAY function? Below is a sample of field
    data
    . I want to extract platform field and cast it into an array
    Copy code
    {
      "id": 1,
      "name": "Bitcoin",
      "symbol": "BTC",
      "slug": "bitcoin",
      "cmc_rank": 1,
      "num_market_pairs": 9237,
      "circulating_supply": 18984362,
      "total_supply": 18984362,
      "max_supply": 21000000,
      "last_updated": "2022-03-15T23:00:00.000Z",
      "date_added": "2013-04-28T00:00:00.000Z",
      "tags": [
        "mineable",
        "pow",
        "sha-256",
        "store-of-value",
        "state-channel",
        "coinbase-ventures-portfolio",
        "three-arrows-capital-portfolio",
        "polychain-capital-portfolio",
        "binance-labs-portfolio",
        "blockchain-capital-portfolio",
        "boostvc-portfolio",
        "cms-holdings-portfolio",
        "dcg-portfolio",
        "dragonfly-capital-portfolio",
        "electric-capital-portfolio",
        "fabric-ventures-portfolio",
        "framework-ventures-portfolio",
        "galaxy-digital-portfolio",
        "huobi-capital-portfolio",
        "alameda-research-portfolio",
        "a16z-portfolio",
        "1confirmation-portfolio",
        "winklevoss-capital-portfolio",
        "usv-portfolio",
        "placeholder-ventures-portfolio",
        "pantera-capital-portfolio",
        "multicoin-capital-portfolio",
        "paradigm-portfolio"
      ],
      "quote": [
        {
          "USD": {
            "price": 39338.785492032424,
            "volume_24h": 23934000867,
            "percent_change_1h": -0.392318904627,
            "percent_change_24h": -0.826814091987,
            "percent_change_7d": 1.552812110667,
            "market_cap": 746821744421.0917,
            "last_updated": "2022-03-15T23:00:00.000Z"
          }
        }
      ]
    }
    s
    m
    • 3
    • 2
  • j

    Jeff Levesque

    06/15/2022, 12:52 AM
    I'm looking at PyFlink documentation regarding how to define UDF. Seems like there a variety of ways this can be done. I was wondering if there is a preferred approach?
    x
    s
    • 3
    • 3
  • s

    Shun Sun

    06/15/2022, 9:05 AM
    Hi all, I am trying to use flink to consume data from a SSL enabled Kafka, which requires for a path for keystone and truststone file, since we are running on yarn cluster, so the question is how to provide remotely located keystore files to kafak client in a flink. I know that on spark side, the --files can be used to attach files into classpath of either executor or driver, is there any similar way we can achieve that? I try the -kt, yarnship, but it fails on session mode. Looking forward any suggestion!
    👀 1
    d
    • 2
    • 7
  • t

    Tymur Yarosh

    06/15/2022, 10:48 AM
    Hi guys! Is there any built-in approach to enable auth for remote Stateful Functions? I don’t see auth configuration for the transport here https://nightlies.apache.org/flink/flink-statefun-docs-release-3.2/docs/modules/http-endpoint/
  • a

    Aqib Mehmood

    06/15/2022, 1:19 PM
    Hi All, We're trying to use checkpointing for stateful resumption of flink jobs. Checkpoints are by default automatically deleted on job cancellation. Here is the code to prevent the checkpoints from deleting after job cancellation.
    Copy code
    CheckpointConfig config = env.getCheckpointConfig();
    config.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    The issue is that: This code doesn't seem to be working on our current flink version i.e. 1.13.1 Although it is working fine in version 1.13.6. These two version are almost similar so we aren't able to understand why this is. Have anyone else faced this issue or know how to overcome it? TIA
    m
    • 2
    • 5
  • d

    DJ

    06/15/2022, 2:32 PM
    Hi, I am trying out the window deduplciation SQL in 1.15. the dedup SQL is stuck for hours on my mac if the parallelism.default is more than 1. It returned quickly if parallelism.default is 1. Details see thread:
    s
    • 2
    • 7
  • j

    Jeesmon Jacob

    06/15/2022, 2:56 PM
    Hi there, anyone has an example of configuring json logging in kubernetes operator? Thanks
    m
    • 2
    • 7
  • r

    Rich Hanes

    06/15/2022, 7:27 PM
    Hi there, I think I found a bug in the training material: In the hourly tips exercise the README says <https://github.com/apache/flink-training/tree/release-1.15/hourly-tips#input-data%7CThe TaxiFareGenerator annotates the generated DataStream<TaxiFare> with timestamps and watermarks. Hence, there is no need to provide a custom timestamp and watermark assigner in order to correctly use event time.> However, the exercise does not have the watermarks set and the solution does. This made it hard to run through this exercise, as the error was unexpected. Please update the exercise with the watermarking code, or remove the comment from the README.
    d
    • 2
    • 7
  • t

    Theo Diefenthal

    06/15/2022, 10:46 PM
    I have a small issue with my Flink 1.13 -> 1.14 upgrade. I see my state along with the _metadata file doubling in size with each savepoint and restoration from it. Investigating into the issue, I see
    topic-partition-offset-states
    which doubles in size each time and now already has 4 million entries in the UNION list state. From flink code investigation, I see that this state belongs to FlinkKafkaConsumerBase. However, with my Flink 1.14 upgrade, I replaced the FlinkKafkaConsumer with the new KafkaSource, so this state should not be in use at all, making me questioning why its retained and why it doubles in size on each restart with savepoint. Hence the question: I start my job with "allowNonRestoreState". Could it be that even though unused, the state is still kept in Flink somewhere and used in the next savepoint? Note that I already resolved the issue by simly deleting the state and starting the job from scratch. Now the savepoint works as usual. But I'm just wondering what could have caused this behaviour
    👀 2
    m
    k
    • 3
    • 8
  • e

    Eric Hwang

    06/16/2022, 3:05 AM
    Does anyone know if there is a way to reconfigure a flink job after it’s started running? In our flink job, we are dynamically inferring configurations to use during initialization. We are trying to find a way to do the following: 1. Have the job somehow periodically detect if the configuration might be different somehow 2. Rerun the initialization to reconfigure all the tasks This is appealing to us because it means our flink job will naturally converge on the right configuration without us having to update each one when config parameters change (assuming the changes are backwards compatible). I had thought that maybe we could get the system to crash and restart if we detected changes, but it looks like the restart strategies only apply to individual tasks rather than the whole job. Has anyone else tried to do this before with any success?
    e
    k
    • 3
    • 17
  • z

    Zsombor Chikan

    06/16/2022, 2:19 PM
    Hey Folks, Do you know what will happen if we reach the number settled for
    historyserver.archive.retained-jobs
    ? The oldest archive job will be deleted etc.. or it will throw an
    IllegalConfigurationException
    ?
    ✅ 1
    c
    • 2
    • 2
  • r

    Rajendra

    06/16/2022, 4:24 PM
    i am facing issue with one of the job deployed on stand alone k8s flink cluster deployment constantly getting below error
    Copy code
    org.apache.flink.util.FlinkException: Execution XXXXXXXXX is unexpectedly no longer running on task executor 10.227.18.50:6122-34d81a.
    any help is appreciated thanks in advance.
  • d

    DJ

    06/16/2022, 4:57 PM
    I am trying hive sql dialect on flink sql client and got an error
    org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'hive' that implements 'org.apache.flink.table.planner.delegation.ParserFactory' in the classpath.
    m
    • 2
    • 8
  • h

    Hong Teoh

    06/17/2022, 12:14 PM
    Hi all, I’m looking to create a FLIP, but I don’t seem to have permission to create a new page - how can I go about getting this permission? Thanks in advance! This is my profile: https://cwiki.apache.org/confluence/display/~liangtl
    • 1
    • 1
  • j

    Jeesmon Jacob

    06/17/2022, 12:28 PM
    In the kubernetes operator doc it is mentioned that
    LastState
    upgrade mode is not supported for
    FlinkSessionJob
    . Is there a plan to support it in future? Thanks. https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.0/docs/custom-resource/overview/#limitations
    g
    • 2
    • 23
  • j

    Jeesmon Jacob

    06/17/2022, 7:48 PM
    Question on
    FlinkSessionJob
    in kubernetes operator. When I use
    job.args
    that references a config file, what will be the context of that config file? Is that going to be the location of config file in the docker image referenced in
    FlinkDeployment
    or local to the operator?
    Copy code
    job:
        jarURI: <https://xxx/my-job.jar>
        args:
        - -c
        - /path/to/my/job-config.yaml
    g
    a
    • 3
    • 34
  • s

    Sharon Xie

    06/17/2022, 11:32 PM
    Question about CDC connectors. Is there a way to connect to a CDC source and consume the raw events rather than as a “dynamic table”? The use case would be to connect to a table, and filter out all the “delete” events.
    💯 1
    ✅ 1
    s
    j
    • 3
    • 10
  • m

    Maciej Obuchowski

    06/18/2022, 2:12 PM
    Anyone had problems with submitting jobs via
    Copy code
    StreamExecutionEnvironment.createRemoteEnvironment
    r
    • 2
    • 4
  • d

    Damon Rolfs

    06/18/2022, 7:51 PM
    Hi. We are using Flink 1.13. Our job is not honoring the parallelism supplied to the job run (via REST
    /jars/:jarid/run
    ). The job has a basic three stage setup (source -> through -> sink). We do explicitly
    setParallelism
    on the source, which draws from Kafka, and do not set parallelism on the through and sink stages; i.e., our intention is for the latter parts to follow the parallelism assigned to job at start. We do not set
    parallelism.default
    in flink.conf. I've scoured both code and configuration and I can't find what could prevent the parallelism for the latter stages from using the parallelism supplied on job run. We start the cluster with 16 task managers, and manually follow the conventional (without reactive flink) rescale downward: take a savepoint+cancel; terminate 2 task managers; and then restart the jar with savepoint and parallelism=14. The outcome is the job attempts to run at 16 parallelism (latter stages) and fails with the
    NoResourceAvailableException
    because the required resources aren't available. Similarly, attempts to rescale at other sizes also fail with the job trying to run at 16. We don't
    setMaxParallelism
    relying on the 128 default. I appreciate any guidance on where I can look to find what's driving this nonelastic parallelism of 16. Thanks in advance! Damon
    c
    • 2
    • 16
  • j

    Jeff Levesque

    06/18/2022, 10:08 PM
    My PyFlink sliding window has the following form:
    Copy code
    sliding_window_table = (
            input_table.window(
                Slide.over(sliding_window_over)
                .every(sliding_window_every)
                .on(sliding_window_on)
                .alias(sliding_window_alias)
            )
            .group_by('ticker, {}'.format(sliding_window_alias))
            .select('''
                ticker,
                MIN(price) as min_price,
                MAX(price) as max_price,
                {0}.start as utc_start,
                {0}.end as utc_end
            '''.format(sliding_window_alias))
        )
    I want to find the first value within the sliding window, as well as the last value within the sliding window. So, I found general SQL syntax for this [1]:
    Copy code
    FIRST_VALUE(price) OVER (PARTITION BY CONVERT(date, dateColumn) ORDER BY dateColumn ASC) as first_price,
    FIRST_VALUE(price) OVER (PARTITION BY CONVERT(date, dateColumn) ORDER BY dateColumn DESC) as last_price
    I did a quick search to see if
    FIRST_VALUE
    is a function offered by flink (with hopes it would be supported by PyFlink). I found that it should be provided [2]. So, I tried the syntax in my sliding window as follows:
    Copy code
    sliding_window_table = (
            input_table.window(
                Slide.over(sliding_window_over)
                .every(sliding_window_every)
                .on(sliding_window_on)
                .alias(sliding_window_alias)
            )
            .group_by('ticker, {}'.format(sliding_window_alias))
            .select('''
                FIRST_VALUE(price),
                ticker,
                MIN(price) as min_price,
                MAX(price) as max_price,
                {0}.start as utc_start,
                {0}.end as utc_end
            '''.format(sliding_window_alias))
        )
    However, I received the following error:
    Copy code
    py4j.protocol.Py4JJavaError: An error occurred while calling o87.select.
    : org.apache.flink.table.api.ValidationException: Undefined function: FIRST_VALUE
    	at org.apache.flink.table.expressions.resolver.LookupCallResolver.lambda$visit$0(LookupCallResolver.java:53)
    	at java.base/java.util.Optional.orElseThrow(Optional.java:408)
    	at org.apache.flink.table.expressions.resolver.LookupCallResolver.visit(LookupCallResolver.java:49)
    	at org.apache.flink.table.expressions.resolver.LookupCallResolver.visit(LookupCallResolver.java:36)
    	at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:35)
    	at org.apache.flink.table.expressions.LookupCallExpression.accept(LookupCallExpression.java:66)
    	at org.apache.flink.table.api.internal.TableImpl.lambda$preprocessExpressions$0(TableImpl.java:605)
    	at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
    	at java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
    	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
    	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
    	at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
    	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    	at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
    	at org.apache.flink.table.api.internal.TableImpl.preprocessExpressions(TableImpl.java:606)
    	at org.apache.flink.table.api.internal.TableImpl.access$300(TableImpl.java:66)
    	at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:775)
    	at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:770)
    	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    	at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    	at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    	at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
    	at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    	at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
    	at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
    	at java.base/java.lang.Thread.run(Thread.java:829)
    
    
    Process finished with exit code 1
    I notice that the flink documentation [2], is regarding flink v.1.16, yet I'm using
    apache-flink==1.13.2
    . So, I searched for documentation on flink v.1.13 [3], and found
    FIRST_VALUE
    is supported. [1] https://stackoverflow.com/a/61346024/2063478 [2] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/ [3] https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/functions/systemfunctions/
    s
    • 2
    • 5
  • a

    Ali AIT-BACHIR

    06/20/2022, 12:21 PM
    Hi, We want to deploy Flink jobs in production in “application mode” and we are facing a struggling issue with connector class loaders in sink functions (Cassandra and RabbitMQ). Here is the error log we got:
    java.lang.LinkageError: loader constraint violation: loader (instance of sun/misc/Launcher$AppClassLoader) previously initiated loading for a different type with name "com/datastax/driver/core/Cluster$Builder"
    Thanks for your help.
    c
    • 2
    • 15
  • l

    Luis Figueiredo

    06/20/2022, 7:13 PM
    I'm running Flink in Application Mode. I'm able to create checkpoints, but when the application is restarted how can I restart programatically from the last checkpoint? Thanks in advance for all the help.
    m
    r
    • 3
    • 2
12345...98Latest