https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • a

    anusca

    07/05/2022, 4:14 PM
    Hi, I using pyflink, and I am having the following error:
    Copy code
    py4j.protocol.Py4JJavaError: An error occurred while calling o84.executeInsert.
    : org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.MyTopic'.
    
    Table options are:
    
    'connector'='kafka'
    'format'='json'
    'properties.bootstrap.servers'='localhost:9092'
    'properties.group.id'='123212'
    'topic'='MyTopic'
    Does anyone know how I can solve it? I using the following versions of modules
    Copy code
    numpy==1.19.5
    apache-flink-libraries==1.13.6
    apache-flink==1.13.6
    b
    • 2
    • 4
  • r

    Roman Bohdan

    07/05/2022, 7:51 PM
    Hello. I`m working with iterations on flink. With iteration number equal 5 my system is working okay, but with increasing it, for example to 10, 20 - i`m receiving strange error. Could you pleae help me 🥺
    Copy code
    2022-07-05 19:21:21,796 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] [flink-akka.actor.default-dispatcher-17] Cannot find task to fail for execution d6683fb33067d87adecd0d476af6874e with exception:
    org.apache.flink.runtime.jobmaster.ExecutionGraphException: The execution attempt d6683fb33067d87adecd0d476af6874e was not found.
    	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:450) ~[flink-dist_2.12-1.14.0.jar:1.14.0]
    	at jdk.internal.reflect.GeneratedMethodAccessor42.invoke(Unknown Source) ~[?:?]
    	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
    	at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316) ~[flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
    	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) ~[flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314) ~[flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217) ~[flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
    	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) ~[flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) ~[flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
    	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
    	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
    	at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
    	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
    	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
    	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
    	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
    	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
    	at akka.actor.Actor.aroundReceive(Actor.scala:537) [flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
    	at akka.actor.Actor.aroundReceive$(Actor.scala:535) [flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
    	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
    	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) [flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
    	at akka.actor.ActorCell.invoke(ActorCell.scala:548) [flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
    	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
    	at akka.dispatch.Mailbox.run(Mailbox.scala:231) [flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
    	at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [flink-rpc-akka_28e6940b-1078-4bab-a7c4-a9d16a7c8915.jar:1.14.0]
    	at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?]
    	at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) [?:?]
    	at java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?]
    	at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) [?:?]
    	at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) [?:?]
    m
    l
    • 3
    • 8
  • h

    HAJI

    07/06/2022, 12:49 AM
    I tried set pyflink 1.5. I using ec2 m6g.xlarge instance for filnk. I did
    python -m pip install apache-flink==1.15.0
    after got
    Copy code
    Collecting pyarrow<3.0.0,>=0.15.1
      Using cached pyarrow-2.0.0.tar.gz (58.9 MB)
      Installing build dependencies ... error
      ERROR: Command errored out with exit status 1:
       command: /usr/bin/python3 /usr/lib/python3.7/site-packages/pip install --ignore-installed --no-user --prefix /tmp/pip-build-env-wd5gcv6i/overlay --no-warn-script-location --no-binary :none: --only-binary :none: -i <https://pypi.org/simple> -- 'cython >= 0.29' 'numpy==1.14.5; python_version<'"'"'3.7'"'"'' 'numpy==1.16.0; python_version>='"'"'3.7'"'"'' setuptools setuptools_scm wheel
           cwd: None
      Complete output (367 lines):
      Ignoring numpy: markers 'python_version < "3.7"' don't match your environment
      Collecting cython>=0.29
        Using cached Cython-0.29.30-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_24_aarch64.whl (1.8 MB)
    
    ....
    
    
    ERROR: Command errored out with exit status 1: /usr/bin/python3 /usr/lib/python3.7/site-packages/pip install --ignore-installed --no-user
    I used Python 3.7.10, but I tried with aws EMR, python 3.4 flink 1.14 and python 3.6 and flink 1.14 but still can't set pyflink
    Copy code
    Building wheels for collected packages: pyarrow
      Building wheel for pyarrow (pyproject.toml) ... error
      ERROR: Command errored out with exit status 1:
       command: /usr/local/bin/python3 /home/hadoop/.local/lib/python3.6/site-pes/pip/_vendor/pep517/in_process/_in_process.py build_wheel /tmp/tmp2hpyiu5
           cwd: /mnt/tmp/pip-install-vgb1drwj/pyarrow_82e244416a7f48bf88e94182cb4
    ...
    
      error: command 'cmake' failed with exit status 1
      ----------------------------------------
      ERROR: Failed building wheel for pyarrow
    Failed to build pyarrow
    ERROR: Could not build w
    how can I do ?
    👍 1
    x
    • 2
    • 4
  • l

    laxmi narayan

    07/06/2022, 8:34 AM
    Hi, I wanted to understand, how can I use MapOfMap as state-store,
    a
    • 2
    • 4
  • r

    Rashmin Patel

    07/06/2022, 8:47 AM
    Hii all, Is there a way to derive flink TypeInformation from scala TypeTag/ClassTag ? I am writing a generic DAG.
    Copy code
    class FlinkDataflow[T: TypeTag](val stream: DataStream[T]) extends IDataflow[T] {
    
      
    def map[O:TypeTag](f: T => O, id: String): FlinkDataflow[O] = {
    
        val typeInfo: TypeInformation[O] = TypeInformation.of(classOf[O])
    
        val resultStream: DataStream[O] = stream.map(new MapFunction[T, O] {
          override def map(value: T): O = {
            f.apply(value)
          }
        }).returns(typeInfo).uid(id).name(id)
        new FlinkDataflow[O](resultStream)
      }
    }
    • 1
    • 1
  • h

    HAJI

    07/06/2022, 8:49 AM
    I have flink 1.15 and python 3.7 and i want to using kafkaSource. why I can't find KafkaSource example ㅠㅠ?
    x
    • 2
    • 10
  • s

    Shashank Mishra

    07/06/2022, 4:35 PM
    Hi Team, We are working on a real-time data streaming application and using Flink statefun (Remote functions in Python) for the same, in our use case we have a small requirement to read data from S3 for small data validation and that call we want to make from flink statefun .. I researched a lot but didn't find anything relevant for flink statefun in python, can someone help with an example to read data from S3 in flink statefun (Remote functions in python) .. We are using Flink statefun version 3.10
    g
    m
    • 3
    • 5
  • j

    Jaya Ananthram

    07/06/2022, 5:43 PM
    Hello 👋, I have a question about the Flink Avro SQL. Does Flink Avro SQL supports Avro nested data type (record)? According to the docs, it looks like it provides the support, but I don't find a test case here for the nested data type, and this is the schema used for the same test case). So wanted to know about the experience with the nested data type like cons or any drawback (if anyone worked on this), Thanks.
    r
    • 2
    • 3
  • g

    Geoffrey Picron

    07/06/2022, 5:57 PM
    👋 Hello, team! I have an issue to understand how to setup multiple taskmanager replicas with the Kubernetes Flink Operator 1.0.1.
    g
    • 2
    • 3
  • a

    Adrian Chang

    07/06/2022, 7:58 PM
    👋 Hello, team! I am using PyFlink 1.15.0 with Table API. The source events come from Kafka every 5 minutes and I apply a HOP Windows of size 60 minutes and slide of 5 minutes.
    Copy code
    GROUP BY HOP(tsMs, interval '5' MINUTE, interval '60' MINUTE)
    The issue is I never get the latest 5 minutes. For example, the windows is called at:
    2022-07-06 15:45:04.694672
    window start time:
    2022-07-06 18:44:59.999000
    window row time:
    2022-07-06 19:39:59.999000
    I was expecting a window row time of:
    2022-07-06 19:44:59.999000
    I suspect the reason is that I receive the source event every 5 minutes and how the watermarks are generated. I tried with
    Copy code
    t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
    t_env.get_config().get_configuration() \
        .set_string('parallelism.default', '1') \
        .set_string('pipeline.time-characteristic', 'EventTime') \
        .set_string('pipeline.auto-watermark-interval', '100 ms')
    but it doesn't make any difference. Could you confirm if these parameters are valid for Python and Table API ? If yes, could you provide me with an example of how to use it ? Thanks
  • j

    Jeremy Ber

    07/06/2022, 9:29 PM
    For the S3 Streaming File Sink, if i am using a Custom BucketAssigner to write data to a specific folder (hour) in S3 based on my timestamps, is it possible this data could land in a different folder (hour) for some reason like backpressure / failed checkpoints / etc? How might this happen? example:
    timestamp: 2022-07-06T01:01:01
    bucket should be
    year=2022/month=07/day=06/hour=01
    but it’s landing in
    year=2022/month=07/day=06/hour=05
    a
    m
    • 3
    • 4
  • j

    Jaya Ananthram

    07/06/2022, 9:43 PM
    Hello 👋, question related to SQL Avro table API schema evolution (specifically for Full Transitive mode). Assume we have a producer who publishes the data to Kafka and then the multiple consumers (multiple Flink SQL jobs technically) consume the data and stream it to multiple sinks. Now the question is - how can I make the consumers work with Full Transitive mode? ie - the consumers (Flink SQL jobs) should not break if I add a new optional field on the producer side at different intervals before updating the consumer's schema. Assume that the producers and consumers sit in different teams and synchronizing the rollout is not all possible and it is also expected that the consumer might restore the previous savepoint for various technical reasons. Whether this is supported in Flink Table API? When I add a new field on the producer side (at the end) without updating the consumer schema, simply it fails because the table DDL is still using the previous version. For better flexibility, I prefer to work with Full Transitive so producers and consumers can be decoupled completely and they can upgrade themselves whenever they find the time. Is this possible in Flink SQL API? I don't find much information on the Flink docs about this topic. I believe in the Data Processing world usually people prefer the Full Transitive mode for better flexibility. So wondering how to achieve this? 🤔
    g
    m
    • 3
    • 26
  • h

    HAJI

    07/07/2022, 1:44 AM
    hello, flink >< I hava a qustion. I am using pyflink, I need kafka header data. ref https://stackoverflow.com/questions/47558289/way-to-read-data-from-kafka-headers-in-apache-flink it can do override. I tracking python function, it only link jvm. if i need use kafka header, only can rebuild jar for me?
    x
    • 2
    • 5
  • o

    Owen Lee

    07/07/2022, 7:55 AM
    Has anybody faced an issue where watermark is higher in downstream operator? details are listed in following issue. This only happens with the new KafkaSource. https://stackoverflow.com/questions/72654182/flink-interval-join-datastream-with-kafkasource-drops-all-records/
    d
    • 2
    • 1
  • h

    HAJI

    07/07/2022, 8:24 AM
    😰 can I ask 1 more? pyflink support sink to elasticsearch ?
    x
    • 2
    • 9
  • s

    Shqiprim Bunjaku

    07/07/2022, 8:34 AM
    Hello Does Flink SQL or Table API support Pivot since I cannot find this function in the list of buildin functions?
    m
    • 2
    • 1
  • n

    Nithin kharvi

    07/07/2022, 8:51 AM
    Hi, We are facing an issue with flink taskmanager metrics where the application is deployed on aks. We are able to get the flink jobmanager metrics (system metrics) in the azure log analytics. But there is no metrics scrapped from flink taskmanager. The job is deployed using flink k8s operator crd. We tried the below 2 configs in the crd, but not able to get the TM metrics. 1) spec: flinkConfiguration: metrics.reporters: prom metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter 2) spec: flinkConfiguration: kubernetes.operator.metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter kubernetes.operator.metrics.reporter.prom.port: "9999" are we missing any other configs related to flink metrics?
  • a

    Aqib Mehmood

    07/07/2022, 8:59 AM
    Hi Everyone, We're currently working on taking data from GCP pubsub and sinking in our Redhsift tables using JDBC connector. Here is the problem that we're facing I'm using this code to sink into JDBC
    Copy code
    streamExecEnv.fromElements(new JSONObject(DataStream<String>.toString())).addSink(
                    JdbcSink.sink(
                            "insert into <table> (<column 1>, <column 2>) values (?, ?)",
                            (statement, response) -> {
                                statement.setObject(1, response.get("<key 1>");
                                statement.setObject(2, response.get("<key 2>"));
                            },
    The DataStream<String>.toString() object on the first line, despite being in JSON form, does not act like a string despite the toString() method and thus cannot be converted into JSON. So we're facing difficulty sinking streaming data into JDBC using DataStream objects. Is there a more standard method of sinking realtime data into jdbc that I'm missing? Thank you in advance
    g
    m
    • 3
    • 16
  • z

    Zain Haider Nemati

    07/07/2022, 11:15 AM
    Hello, I am using a jdbc sink with flink datastream api v 1.15. my map function returns a json object with multiple key value pairs and im trying to access those in the jdbc sink. This is how it looks like However on execution im getting the error jsonobj[“a”] not found on sink. I have tested out the map function im getting properly formatter jsons with all the keys. Any help around this would be appreciated or if there is any repo on github which is leveraging datastream api to sink to jdbc. The examples on docs are very simple and dont provide enough insight into this issue. TIA
    Copy code
    transformed_Stream.addSink(JdbcSink.sink(
                            "insert into table (a,b,c) values (?, ?, ?)",
                            (statement, response) -> {
                                statement.setString(1, response.get("a").toString());
                                statement.setString(2, response.get("b").toString());
    							statement.setString(3, response.get("c").toString());},               
                    ));
    Error:
    Copy code
    Caused by: org.json.JSONException: JSONObject["a"] not found.
    c
    • 2
    • 4
  • r

    Roman Bohdan

    07/07/2022, 12:44 PM
    Hello, guys. Need your suggestion 🥺. We`re using flink with iterations. And increasing number of iterations requires huge amount cpu and memory. We have 10 iteraions and it needs 4g of memory. Can you please suggest, how can we optimize memory usability? Is there any chances to use rocksdb or something else?
    m
    m
    • 3
    • 4
  • s

    Slackbot

    07/07/2022, 4:52 PM
    This message was deleted.
    s
    • 2
    • 2
  • s

    Susan Perkins

    07/07/2022, 6:25 PM
    Hi I'm completely new to Flink and my team is using Flink StateFun, emphasis on the fun! It's working well for the most part. The problem comes in when we try to do an initial migration of a huge data set, which throws hundreds of batch messages onto kafka at once, each with a hundred of messages. Meaning, kafka gets hundreds of thousands of messages to process, and dutifully sends them off to the master which distributes it to the workers and then the functions. The functions take days and days to calculate state, and the whole thing usually falls down with out of memory or the different functions just quit, apparently because they can't communicate because they or the workers timeout waiting for each other. (socket connection errors, socket hangup errors). What is the best approach to handle a large amount of initial data and state calculations for large datasets?
    s
    t
    • 3
    • 30
  • a

    Adrian Chang

    07/07/2022, 6:41 PM
    Hello. I am using Table API on Flink 1.15 to consume from a Kafka topic with 2 partitions. The events on that topic are produced every 5 minutes, that means Flink won't receive any event for 5 minutes. But I would like to update the Watermark meanwhile. I have tried with the options
    table.exec.source.idle-timeout
    and
    pipeline.auto-watermark-interval
    but the watermark is not updated when there is no events. I guess the purpose of
    table.exec.source.idle-timeout
    is if some partitions get idle, not for the entire topics. Am I right ? If I regularly produce to the Kafka topic "ping" events, the Watermark is updated as I expect. Is it any other solution for this scenario than producing "ping" events ? Thanks
    m
    s
    • 3
    • 4
  • d

    Dan Hill

    07/08/2022, 1:05 AM
    Hi. I’m working on a failing Minicluster test and want to get the job exception reported programmatically at the end of the test. I’m trying to use
    /jobs/:jobid/exceptions
    REST API but I don’t get useful information from the exceptions API. I get the job status using
    Copy code
    clusterClient.getJobStatus(jobId).get
    The job status is
    FAILED
    . When I do a REST API for the exception, I get:
    Copy code
    {
      "root-exception": null,
      "timestamp": null,
      "all-exceptions": [],
      "truncated": false,
      "exceptionHistory": {
        "entries": [],
        "truncated": false
      }
    }
    The getJobStatus call returns a stacktrace that indicates the checkpoint timed out. This happens very quickly and I explicitly set the timeout to a longer time period. Is this a known issue with the exceptions API?
  • s

    salvalcantara

    07/08/2022, 4:11 AM
    Hi all! I just posted this to the User Mailing List: https://lists.apache.org/thread/zgjmdzqywfd1jk7gcvs5s51rm0ltyjlc. As explained, I'm trying to know whether it's possible to read from kafka topics on demand. There is an open initiative for making
    setKafkaSubscriber
    public on the kafka source builder (see https://issues.apache.org/jira/browse/FLINK-24660). So, I guess using a custom subscriber would be a way to go about it. However, I'm wondering if it could be achieved in a more reactive way, e.g., by using some sort of control signal for the source so that new topics are specified on demand (e.g., by publishing the set of topics of interest to a control topic). That is something I've been thinking about lately, what are your thoughts? Is this something doable? Nonsensical?
  • g

    Gaurav Miglani

    07/08/2022, 6:59 AM
    I'm getting hadoop exception while running k8 operator in HA, can someone help
    🧵 2
    h
    e
    • 3
    • 22
  • g

    Gaurav Miglani

    07/08/2022, 9:26 AM
    while using flink k8 operator, for my job, I'm getting rest service as ClusterIP, how can I change it to load balancer, tried with ingress, but it doesn't work out, I have checked advanced and basic examples here: https://github.com/apache/flink-kubernetes-operator/blob/main/examples, but nothing is creating load balancer, do I need to explicitly define k8 service of type load-balancer ?
    Copy code
    NAME                                        TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)    AGE
    service/flink-operator-webhook-service      ClusterIP   172.20.171.206   <none>        443/TCP    25h
    service/test-job-rest                      ClusterIP   172.20.112.225   <none>        8081/TCP   45s
    m
    y
    t
    • 4
    • 23
  • l

    Levani Kokhreidze

    07/08/2022, 9:39 AM
    Hi! While experimenting with local recovery feature (Flink 1.15.1) I noticed that if JobManager is restarted TaskManagers always recover from Remote (
    IncrementalRemoteKeyedStateHandle
    ). While if I restart task managers, local recovery is triggered. Wondering if this is known limitation or there’s some additional config to tweak? Setup: • HA setup with Zookeeper and S3 remote storage. • JobManager runs as StatefulSet with PersistentVolume. Both
    process.jobmanager.working-dir
    and
    jobmanager.resource-id
    are correctly configured. • TaskManagers run as StatefulSets with PersistentVolume. Both
    process.taskmanager.working-dir
    and
    taskmanager.resource-id
    are correctly configured.
    l
    • 2
    • 7
  • g

    Gaurav Miglani

    07/08/2022, 1:44 PM
    When I'm running job with some load in flink 1.15.1 using k8 operator, I am getting NullPointerException, can anyone help me with it
    Copy code
    2022-07-08 10:53:36,253 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - MiniBatchAssigner[2] -> Calc[3] -> (LocalWindowAggregate[4], LocalWindowAggregate[12], LocalWindowAggregate[21], LocalWindowAggregate[30]) (33/40) (4904d8c6fa01f6c1d9856a829351795e) switched from RUNNING to FAILED on user-concurrency-streamverse-taskmanager-1-1 @ ip-10-171-54-88.ec2.internal (dataPort=41191).
    java.lang.NullPointerException: null
    	at org.apache.flink.table.runtime.operators.window.slicing.SliceAssigners$AbstractSliceAssigner.assignSliceEnd(SliceAssigners.java:558) ~[flink-table-runtime-1.15.1.jar:1.15.1]
    	at org.apache.flink.table.runtime.operators.aggregate.window.LocalSlicingWindowAggOperator.processElement(LocalSlicingWindowAggOperator.java:114) ~[flink-table-runtime-1.15.1.jar:1.15.1]
    	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99) ~[flink-dist-1.15.1.jar:1.15.1]
    	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80) ~[flink-dist-1.15.1.jar:1.15.1]
    	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39) ~[flink-dist-1.15.1.jar:1.15.1]
    	at org.apache.flink.streaming.runtime.tasks.CopyingBroadcastingOutputCollector.collect(CopyingBroadcastingOutputCollector.java:40) ~[flink-dist-1.15.1.jar:1.15.1]
    	at org.apache.flink.streaming.runtime.tasks.CopyingBroadcastingOutputCollector.collect(CopyingBroadcastingOutputCollector.java:28) ~[flink-dist-1.15.1.jar:1.15.1]
    	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-dist-1.15.1.jar:1.15.1]
    	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-dist-1.15.1.jar:1.15.1]
    	at StreamExecCalc$56.processElement_split2(Unknown Source) ~[?:?]
    	at StreamExecCalc$56.processElement(Unknown Source) ~[?:?]
    	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99) ~[flink-dist-1.15.1.jar:1.15.1]
    	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80) ~[flink-dist-1.15.1.jar:1.15.1]
    	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39) ~[flink-dist-1.15.1.jar:1.15.1]
    	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-dist-1.15.1.jar:1.15.1]
    	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-dist-1.15.1.jar:1.15.1]
    	at org.apache.flink.table.runtime.operators.wmassigners.RowTimeMiniBatchAssginerOperator.processElement(RowTimeMiniBatchAssginerOperator.java:74) ~[flink-table-runtime-1.15.1.jar:1.15.1]
    	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) ~[flink-dist-1.15.1.jar:1.15.1]
    	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) ~[flink-dist-1.15.1.jar:1.15.1]
    	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) ~[flink-dist-1.15.1.jar:1.15.1]
    	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.15.1.jar:1.15.1]
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) ~[flink-dist-1.15.1.jar:1.15.1]
    	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) ~[flink-dist-1.15.1.jar:1.15.1]
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) ~[flink-dist-1.15.1.jar:1.15.1]
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) ~[flink-dist-1.15.1.jar:1.15.1]
    	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) ~[flink-dist-1.15.1.jar:1.15.1]
    	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) ~[flink-dist-1.15.1.jar:1.15.1]
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) ~[flink-dist-1.15.1.jar:1.15.1]
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) ~[flink-dist-1.15.1.jar:1.15.1]
    	at java.lang.Thread.run(Unknown Source) ~[?:?]
    • 1
    • 3
  • a

    Adrian Chang

    07/08/2022, 8:22 PM
    Hi team, I don't find the class
    WatermarkGenerator
    in the Python code of Flink 1.15. Is it possible to have an equivalent to Periodic WatermarkGenerator in Python ? I want to increase the watermark even if there is no event for a while
1...567...98Latest