https://flink.apache.org/ logo
Join SlackCommunities
Powered by
# troubleshooting
  • h

    huang hills

    03/02/2023, 7:31 AM
    when I restart flink-cluster (standalone mode), all flink jobs disappear. Why?
  • m

    Mitchell Jeppson

    03/02/2023, 7:36 AM
    Hey Team! I am relatively new to the flink world and having some issues I hope you can help clarify for me. Thanks in advance! I am creating two temporary views, each one like so:
    tableEnv.createTemporaryView(tableName, stream)
    , I am then joining the two views by querying them, something like this:
    Copy code
    val resultTable = streamTableEnvironment.sqlQuery(myQuery)
    
    streamTableEnvironment.toChangelogStream(resultTable)
    The issue I am seeing with this is I am getting is that the entire view is republished each time a new message is consumed and thus added to the table. I assume this is the expected behaviour, but is there anyway to configure only publishing the new row in the case of an insert or the updated row in the case of an update? I can provide more detailed code examples if that would be helpful gratitude thank you
    • 1
    • 1
  • a

    Amresh Venugopal

    03/02/2023, 8:19 AM
    Hey everyone. I am struggling to follow the documentation for using s3 plugins. This is how I have integrated the plugins as per the File Systems doc.
    Copy code
    ❯ tree plugins
    plugins
    ├── README.txt
    └── s3-fs-presto
        └── flink-s3-fs-presto-1.16.1.jar
    I'll add my flink-conf.yaml in the thread. When I run:
    Copy code
    flink run some.jar
    I see the following exception in the
    flink-amresh.venugopal-client-ip-172-31-50-66.log
    . The
    .jar
    I am using works if I don't use s3 for checkpointing.
    Copy code
    Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3p'. The scheme is directly supported by Flink through the following plugin(s): flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See <https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/> for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see <https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/>.
    	at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:515)
    	at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
    	at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)
    c
    • 2
    • 2
  • a

    Adesh Dsilva

    03/02/2023, 10:54 AM
    Hi Is there any example async sink implementation for simple http calls using completable future? Using async io -> discarding sink seems to be more simple than writing my own sink writer implementation.
    d
    • 2
    • 2
  • p

    Parth

    03/02/2023, 11:40 AM
    Hello folks, I want to automate the flink deployment over AWS emr, currently i am able to automate the process of submitting the flink application in the step section of AWS emr cluster. But now my next step is , I want to terminate the current job with savepoint and resume the new job with that savepoint. If anyone has any idea on it, kindly share your thoughts, it will be helpful for me.❤️
    s
    • 2
    • 2
  • n

    Nick Pocock

    03/02/2023, 4:47 PM
    Has anyone got any experience unit testing Flink Statefuns with the Go SDK? Having a few issues
  • d

    dario bonino

    03/02/2023, 5:10 PM
    Hi all, we are building a source connector using the new source API and we are trying to provide new splits on request (extending the handleSplitRequest method from SplitEnumerator). When we run the source the task manager fails after reading few splits, with the error:
    Have records for a split that was not registered
    . However split addition is performed through the context and ideally the splits should be registered. Anyone is perhaps incurred in a similar issue? If so, are there any attention points that we must consider in trying to debug this issue?
    s
    • 2
    • 1
  • s

    sharad mishra

    03/02/2023, 8:23 PM
    Hello, does Flink-Streaming-InfluxDB, here supports exactly-once(two phase commit) sink ?
    m
    • 2
    • 3
  • j

    James McGuire

    03/03/2023, 1:16 AM
    Hi folks, I'm having a little trouble using Flink SQL to setup a table with
    JSON_OBJECT
    component that references a
    DATE
    column. Here's the schema I'm trying to create and the error:
    Copy code
    CREATE TABLE test_table (
      some_date DATE,
      object AS JSON_OBJECT(
        KEY 'some_date' VALUE some_date 
      )
    )
    COMMENT ''
    WITH (
      'connector'='datagen'
    )
    ; 
    Flink SQL> select * from default_catalog.default_database.test_table ;
    [ERROR] Could not execute SQL statement. Reason:
    org.apache.flink.table.planner.codegen.CodeGenException: Type 'DATE' is not scalar or cannot be converted into JSON.
    • 1
    • 5
  • s

    Sandeep R

    03/03/2023, 3:21 AM
    Hi Team, I am looking for some example to read data from kafka and insert to hive with flink. Java/python works and this is Json format and No processing needed and this can done with flink SQL?Thank You.
  • r

    Raghunadh Nittala

    03/03/2023, 4:11 AM
    Hi Folks, I’m trying to use TableStream API with ‘upsert-kafka’ connector to write some data to kafka topic. For this, I want to test out the queries by inserting some sample data. This is the proto for reference:
    Copy code
    message EventChanged {
      string id = 1;
      map<string, google.protobuf.Any> changes = 2;
    }
    
    message SampleRule {
      string event_type = 1;
      EventChanged change_events = 2;
    }
    Here are my queries:
    Copy code
    CREATE TABLE test_changes (
        message_key STRING NOT NULL,
        event_type STRING NOT NULL,
        event_changed ROW<id STRING, changes MAP<STRING, BYTES>>,
    CONSTRAINT pk_test_changes PRIMARY KEY (message_key) NOT ENFORCED
    ) WITH (
        'connector' = 'upsert-kafka',
        'topic' = 'test-changes',
        'properties.bootstrap.servers' = 'localhost:9092',
        'key.format' = 'raw',
        'value.format' = 'protobuf',
        'value.protobuf.message-class-name' = 'TestProto$SampleRule',
        'value.fields-include' = 'EXCEPT_KEY'
    );
    My question is - Is the datatype BYTES correct for protobuf Any field? How can I prepare the data for these type of fields. I tried the statement, `INSERT INTO test_changes(message_key, event_type, event_changed) VALUES('abc123', 'efg', ROW('123', MAP['www', '123']));`but this didn’t work.
  • s

    Sudhan Madhavan

    03/03/2023, 6:27 AM
    Hello Everyone, I am trying to write some custom metrics in my flink application. As per the flink guidelines, we have to fetch the metric group only using getRuntimeContext() inside of RichFunction. My use case needs to write a custom metrics across any operator irrespective of inside of Richfunction. Is there any possible way to do this or can any of you please give some suggestions? Thanks !!
  • r

    Rohan Kumar

    03/03/2023, 9:20 AM
    Hello everyone, I have a kafka topic where same user’s data goes to the same partition. I want to prepare a simple running aggregate like count for each user in flink. How should I write the flink job so that there is no or less data shiffle? What else I can do to optimize the application and ready for scaling when we have a lot of users and data?
  • a

    Aditya

    03/03/2023, 1:21 PM
    Hi Everyone, I'm looking into Dynamic Tables and Table Api. Wanted to understand it a bit deeper. Is there any design doc or confluence around the same? I am especially interested into how logical plan for table api is converted to physical plan, and how states of dynamic tables are maintained.
    m
    • 2
    • 1
  • r

    Richard Noble

    03/03/2023, 1:26 PM
    Hi All Is this the right place to ask a question about Flink Table Store?
    d
    • 2
    • 10
  • g

    Guruguha Marur Sreenivasa

    03/03/2023, 2:44 PM
    Hi all, are there any recommendations of what the JVM Metaspace should be set as for Job Manager? We currently have it set at 512MB for our session cluster in production and the current usage is at 411MB. I don't think we can deploy a lot more jobs to our cluster. Also, any help in understanding how metaspace works is appreciated! Thank you.
  • a

    Alex Escobedo

    03/03/2023, 3:25 PM
    Hi all! I have flink filesource reading from an s3 path. My use case is migrate the files we have from an old s3 datalake bucket in json to parquet in the new bucket. Reading the documentation it seems like flink first has to go through all the file paths recursively to start the conversion job. This is okay for small paths, but in prod, a few gigs was insanely long (didnt finish after going at it for over half hour) never mind a few tbs (actual goal). Is there any way to consume the files in a reactive way (1.15)? Or is there any other technology that would benefit me better?
    s
    • 2
    • 10
  • s

    Singh

    03/03/2023, 6:40 PM
    👋 , In our Flink deployments, often times there’s a use case where we would want the flink pipeline, that was previously running with state, to start from scratch, as in from no state. When using the Flink K8s operator, would it mean we can change a running production pipeline spec from
    upgradeMode: savepoint
    to
    upgradeMode: stateless
    ? Operator docs mention
    stateless
    isn’t recommended for production use, is that a general recommendation or that
    stateless
    wouldn’t work for this case ? Thanks !
    s
    g
    • 3
    • 5
  • l

    Leon Xu

    03/03/2023, 7:02 PM
    Hi all, questions for Flink CDC Postgres. I wonder why Flink CDC doesn't support parallel reading for the snapshot phase in postgres? Is there a plan to support that in the near future? Looks like this is not an issue for mysql.
    s
    • 2
    • 2
  • j

    James Timotiwu

    03/03/2023, 9:16 PM
    Hi, Has anyone looked into tokenizing event attributes in a high throughput event stream? There's a clear path towards hash tokenization using a cipher like AES, which can scale and run entirely in a Process Function. But this won't meet all compliance requirements, such as GDPR right to be forgotten. What's being considered is a lookup table with a pseudorandom/random unique identifier. The lookup table needs to be accessed quickly and securely persisted. We'd have to make constant reads to avoid key collisions, so parallelization might not be possible. Would like to see if the community has worked on something similar before, and how they were able to leverage certain properties in Flink to enable this!
  • m

    Mali

    03/04/2023, 7:45 AM
    Hello everyone i am pretty new in Apache Flink. I deployed flink-k8s-operator via helm inside of my eks cluster and i build custom pyflink image. My example applications are running without issue. But i need to consume data from RabbitMQ. While trying to connect RabbitMQ i am getting following error. My Pyflink code is;
    Copy code
    import os
    from pyflink.common import SimpleStringSchema
    from pyflink.datastream.connectors.rabbitmq import RMQConnectionConfig, RMQSource
    from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment
    
    # logs configs
    RABBITMQ_LOGS_HOST = os.getenv("RABBITMQ_LOGS_HOST")
    RABBITMQ_LOGS_USER = os.getenv("RABBITMQ_LOGS_USER")
    RABBITMQ_LOGS_PASSWD = os.getenv("RABBITMQ_LOGS_PASSWD")
    RABBITMQ_LOGS_PORT = os.getenv("RABBITMQ_LOGS_PORT")
    QUEUE_NAME = os.getenv("QUEUE_NAME")
    
    
    env = StreamExecutionEnvironment.get_execution_environment()
    env.add_jars("file:///opt/flink/lib/flink-connector-rabbitmq-1.16.1.jar","file:///opt/flink/lib/flink-sql-connector-rabbitmq-1.16.0")
    env.enable_checkpointing(600000)
    
    rabbitmq_connection_config = RMQConnectionConfig.Builder() \
        .set_host(RABBITMQ_LOGS_HOST) \
        .set_port(RABBITMQ_LOGS_PORT) \
        .set_user_name(RABBITMQ_LOGS_USER) \
        .set_password(RABBITMQ_LOGS_PORT) \
        .set_prefetch_count(30000).build()
    My deployment yaml is;
    Copy code
    apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
    kind: FlinkDeployment
    metadata:
      name: wuxi
      namespace: flink
    spec:
      ingress:
        template:  "my-private-ingress-url"
        className: "nginx"
        annotations: {}
      image: "my-private-repo"
      imagePullPolicy: "Always"
      flinkVersion: v1_16
      flinkConfiguration:
        taskmanager.numberOfTaskSlots: "1"
      serviceAccount: flink
      jobManager:
        podTemplate:
          apiVersion: v1
          kind: Pod
          metadata:
           name: pod-template
          spec:
            serviceAccount: flink
            containers:
            # Do not change the main container name
              - name: flink-main-container
                envFrom:
                - secretRef:
                    name: flink-secret
        resource:
          memory: "4096m"
          cpu: 1
      taskManager:
        resource:
          memory: "4096m"
          cpu: 1
        replicas: 1
      job:
        jarURI: local:///opt/flink/opt/flink-python_2.12-1.16.1.jar # Note, this jarURI is actually a placeholder
        entryClass: "org.apache.flink.client.python.PythonDriver"
        args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/main.py"]
        parallelism: 1
        upgradeMode: stateless
    I am getting following errors; ERRORrootException while sending command. Traceback (most recent call last): File “/opt/flink/opt/python/py4j-0.10.9.3-src.zip/py4j/java_gateway.py”, line 1224, in send_command raise Py4JNetworkError(“Answer from Java side is empty”) py4j.protocol.Py4JNetworkError: Answer from Java side is empty During handling of the above exception, another exception occurred: Traceback (most recent call last): File “/opt/flink/opt/python/py4j-0.10.9.3-src.zip/py4j/java_gateway.py”, line 1038, in send_command response = connection.send_command(command) File “/opt/flink/opt/python/py4j-0.10.9.3-src.zip/py4j/java_gateway.py”, line 1228, in send_command raise Py4JNetworkError( py4j.protocol.Py4JNetworkError: Error while receiving Traceback (most recent call last): File “/usr/local/lib/python3.8/runpy.py”, line 194, in _run_module_as_main return _run_code(code, main_globals, None, File “/usr/local/lib/python3.8/runpy.py”, line 87, in _run_code exec(code, run_globals) File “/tmp/pyflink/cb21431412312412321412231/421312125676768445654642/main.py”, line 22, in <module> rabbitmq_connection_config = RMQConnectionConfig.Builder() \ File “/opt/flink/opt/python/pyflink.zip/pyflink/datastream/connectors/rabbitmq.py”, line 83, in init File “/opt/flink/opt/python/py4j-0.10.9.3-src.zip/py4j/java_gateway.py”, line 1547, in getattr py4j.protocol.Py4JError: org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig.Builder does not exist in the JVM ERROR org.apache.flink.runtime.util.ClusterUncaughtExceptionHandler [] - WARNING: Thread ‘Thread-14’ produced an uncaught exception. If you want to fail on uncaught exceptions, then configure cluster.uncaught-exception-handling accordingly java.lang.NoClassDefFoundError: com/rabbitmq/client/ConnectionFactory at java.lang.Class.getDeclaredMethods0(Native Method) ~[?:?] at java.lang.Class.privateGetDeclaredMethods(Unknown Source) ~[?:?] at java.lang.Class.privateGetPublicMethods(Unknown Source) ~[?:?] at java.lang.Class.getMethods(Unknown Source) ~[?:?] at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:284) ~[flink-python-1.16.1.jar:1.16.1] at org.apache.flink.api.python.shaded.py4j.commands.ReflectionCommand.getMember(ReflectionCommand.java:140) ~[flink-python-1.16.1.jar:1.16.1] at org.apache.flink.api.python.shaded.py4j.commands.ReflectionCommand.execute(ReflectionCommand.java:91) ~[flink-python-1.16.1.jar:1.16.1] at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) ~[flink-python-1.16.1.jar:1.16.1] at java.lang.Thread.run(Unknown Source) [?:?] Caused by: java.lang.ClassNotFoundException: com.rabbitmq.client.ConnectionFactory at jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source) ~[?:?] at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown Source) ~[?:?] at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?] ... 9 more 2023-03-04 070448,953 INFO org.apache.flink.client.python.PythonDriver [] - ERRORrootException while sending command. Traceback (most recent call last): File “/opt/flink/opt/python/py4j-0.10.9.3-src.zip/py4j/java_gateway.py”, line 1224, in send_command raise Py4JNetworkError(“Answer from Java side is empty”) py4j.protocol.Py4JNetworkError: Answer from Java side is empty Can anyone help to solve this issue please ? Very thanks for your effort 🙂
    g
    d
    • 3
    • 17
  • s

    saqlain pasha

    03/04/2023, 11:16 AM
    Hi, I am getting this exception while using flink operator 1.4 with K8s HA , we also see that not all TMs are shown in the Flink Dashboard UI and after many restarts it works Without HA this exception is not there. We are running Flink in standalone mode
    Copy code
    org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message LocalFencedMessage(8e6c7b9d45fca21399ee867b4db74649, LocalRpcInvocation(requestResourceOverview(Time))) sent to <akka.tcp://flink@realtime-data-publisher.realtimeproc:6123/user/rpc/resourcemanager_0> because the fencing token is null.
    	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:67)
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
    	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    	at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    	at akka.actor.Actor.aroundReceive(Actor.scala:517)
    	at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
    g
    • 2
    • 10
  • s

    Sumit Nekar

    03/05/2023, 10:40 AM
    Hello , I am deploying flink job using flink-kuberentes-operator in application mode. My requirement is to achieve quicker recovery during any TM failures or exceptions. I want to have have stand by slots (TM pods) so that if one TM is lost, job can find a free slot immediately. Restart strategy is 5m - 20 attempts. If we dont have backup//extra slots, then I see job waiting for a longer time slots be to provisioned. Using 1slot per TM. How can I have configure FlinkDeployment to have extra slots. Need some inputs on this.
    g
    • 2
    • 2
  • s

    Sumit Nekar

    03/05/2023, 6:27 PM
    Hello, I get the following error whenever a deploy FlinkDeployment using flink-kuberentes-operator. Job manager pod restarts always for the first time and it comes up for the second time. But the problem with this is, this results in recreating TM pods. Any inputs would be appreciated Thanks.
    Copy code
    2023-03-05 18:22:06,182 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService             [] - Starting RPC endpoint for org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager at <akka://flink/user/rpc/resourcemanager_0> .
    2023-03-05 18:22:06,193 INFO  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector [] - Create KubernetesLeaderElector streaming-pipeline-dispatcher-leader with lock identity 5b92b514-7d6a-4203-a50e-c251de97cd74.
    2023-03-05 18:22:06,288 INFO  org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Starting DefaultLeaderElectionService with KubernetesLeaderElectionDriver{configMapName='streaming-pipeline-dispatcher-leader'}.
    2023-03-05 18:22:06,288 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Starting the resource manager.
    2023-03-05 18:22:06,293 INFO  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector [] - New leader elected a1ce701a-760f-4e87-9e8d-6153f6ee3216 for streaming-pipeline-restserver-leader.
    2023-03-05 18:22:06,293 INFO  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector [] - New leader elected a1ce701a-760f-4e87-9e8d-6153f6ee3216 for streaming-pipeline-dispatcher-leader.
    2023-03-05 18:22:06,388 INFO  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Starting DefaultLeaderRetrievalService with KubernetesLeaderRetrievalDriver{configMapName='streaming-pipeline-resourcemanager-leader'}.
    2023-03-05 18:22:06,491 INFO  org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Starting DefaultLeaderRetrievalService with KubernetesLeaderRetrievalDriver{configMapName='streaming-pipeline-dispatcher-leader'}.
    2023-03-05 18:22:07,077 INFO  org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Recovered 0 pods from previous attempts, current attempt id is 1.
    2023-03-05 18:22:07,078 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Recovered 0 workers from previous attempt.
    2023-03-05 18:22:07,078 INFO  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector [] - Create KubernetesLeaderElector streaming-pipeline-resourcemanager-leader with lock identity 5b92b514-7d6a-4203-a50e-c251de97cd74.
    2023-03-05 18:22:07,089 INFO  org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector [] - New leader elected a1ce701a-760f-4e87-9e8d-6153f6ee3216 for streaming-pipeline-resourcemanager-leader.
    2023-03-05 18:22:07,117 INFO  org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Starting DefaultLeaderElectionService with KubernetesLeaderElectionDriver{configMapName='streaming-pipeline-resourcemanager-leader'}.
    2023-03-05 18:22:19,497 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
    2023-03-05 18:22:19,498 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting KubernetesApplicationClusterEntrypoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
    2023-03-05 18:22:19,499 INFO  org.apache.flink.runtime.blob.BlobServer                     [] - Stopped BLOB server at 0.0.0.0:6124
    2023-03-05 18:22:19,499 INFO  org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting down rest endpoint.
    2023-03-05 18:22:19,593 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Shutting down remote daemon.
    2023-03-05 18:22:19,593 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Shutting down remote daemon.
    2023-03-05 18:22:19,595 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Remote daemon shut down; proceeding with flushing remote transports.
    2023-03-05 18:22:19,595 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Remote daemon shut down; proceeding with flushing remote transports.
    WARNING: An illegal reflective access operation has occurred
    WARNING: Illegal reflective access by org.apache.flink.shaded.akka.org.jboss.netty.util.internal.ByteBufferUtil (file:/opt/flink/lib/flink-dist_2.12-1.13.6.jar) to method java.nio.DirectByteBuffer.cleaner()
  • s

    Shaon

    03/06/2023, 12:16 AM
    Hi, I am trying to aggregate over a keyed stream every 10 seconds however, I would like the aggregation to be global. E.g. if I am running a count aggregation and 5 records come in between 0-10, and 10 between 10-20, I would like the aggregation to show 15 instead of 10 (the count between 10-20). How can this be achieved?
  • z

    Zhiyu Tian

    03/06/2023, 2:04 AM
    Hi , I am submitting the job through the WebUI. But failed with the error bellowing. Also, I found that it is a known bug https://issues.apache.org/jira/browse/FLINK-20995. Server Response Message: org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application. at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:113) at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836) at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Could not execute application. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606) ... 1 more Caused by: org.apache.flink.util.FlinkRuntimeException: Could not execute application. at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:88) at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70) at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:107) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ... 1 more Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Job client must be a CoordinationRequestGateway. This is a bug. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98) at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84) ... 4 more Caused by: java.lang.IllegalArgumentException: Job client must be a CoordinationRequestGateway. This is a bug. at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.setJobClient(CollectResultFetcher.java:90) at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.setJobClient(CollectResultIterator.java:101) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.lambda$executeAsync$14(StreamExecutionEnvironment.java:2193) at java.util.ArrayList.forEach(ArrayList.java:1257) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2193) at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:206) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2169) at org.apache.flink.streaming.api.datastream.DataStream.executeAndCollectWithClient(DataStream.java:1469) at org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1334) at com.microsoft.mt.example.KafkaTest2.main(KafkaTest2.java:111) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ... 7 more
    d
    • 2
    • 3
  • r

    RM

    03/06/2023, 6:47 PM
    Hi ! We observe inconsistencies in checkpoint trigger times (gaps utp to ~50 mins), disregarding the configs of 2m checkpointing interval and 1 min pause in-between. Interestingly, the logs and UI convey the checkpoints complete under 10s. While the pipeline is healthy, non uniform checkpointing intervals is a concern. Has anyone experienced this ? Infra 1. Flink : 1.13 2. Checkpoint storage: FileSystemCheckpointStorage 3. State backend : HashmapStateBackend.
  • d

    Dima Sheludko

    03/06/2023, 7:41 PM
    hi! i faced with an error
    Copy code
    Caused by: com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: ; S3 Extended Request ID: ; Proxy: null), S3 Extended Request ID:  (Path: <s3p://myapp/flink-checkpoints/98d9c76cf8bb4fae909c4bcd0a45c271/chk-27/b89524c6-bf57-425d-82c7-846861f085c3>)
    	at com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$5(PrestoS3FileSystem.java:677)
    	at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:139)
    	at com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:664)
    	at com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:648)
    	at com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:353)
    	at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1760)
    	at com.facebook.presto.hive.s3.PrestoS3FileSystem.create(PrestoS3FileSystem.java:398)
    	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
    	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
    	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1064)
    	at org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:154)
    	at org.apache.flink.fs.s3presto.common.HadoopFileSystem.create(HadoopFileSystem.java:37)
    	at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.create(PluginFileSystemFactory.java:170)
    	at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:130)
    	at org.apache.flink.core.fs.EntropyInjector.createEntropyAware(EntropyInjector.java:76)
    	at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:453)
    i am working with aws s3 and here is list of allowed actions
    Copy code
    "GetObject",
    "GetObjectAcl",
    "GetObjectVersion",
    "DeleteObject",
    "DeleteObjects",
    "HeadObject",
    "ListObjects",
    "ListObjectsV2",
    "PutObject",
    "PutObjectAcl",
    "PutObjectRetention",
    "UploadPart",
    "ListBucket"
    Flink successfully created a recovery in S3 but cannot create a checkpoint. It seems that I need to extend the list of allowed actions to be able to store checkpoints, but I cannot determine which actions to allow. Does anybody have any familiarity with this problem or any ideas on how to fix it?
    j
    h
    • 3
    • 12
  • r

    Ryner Menezes

    03/06/2023, 8:45 PM
    Hello all, Has anyone built a docker image and used successfully for 1.17? Would be helpful if anyone can share the dockerfile for the same.
  • t

    Thijs van de Poll

    03/06/2023, 11:06 PM
    Hi all! I am new to Flink and I am trying to test it’s capabilities in some POC where I try to create a pipeline with the
    postgres-cdc
    connector as source to write to Delta Lake via Trino. However, when using the
    jdbc
    connector to write to Trino I get the following error:
    Copy code
    Caused by: java.lang.IllegalStateException: Could not find any jdbc dialect factory that can handle url 'jdbc:<trino://localhost:8080/deltalake>' that implements 'org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory' in the classpath.
    What would be the best way to sink the data to Trino? Any suggestions? Thanks!
    j
    s
    • 3
    • 3
1...616263...98Latest