https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • a

    Alberto Otero Lorenzo

    06/15/2023, 9:58 AM
    Hi there! I’m trying to set up a minio storage for Flink HA services (k8s) setting, among others,
    highAvailability.storageDir
    to
    <s3://my-minio-host>
    . The problem comes with the credentials. Trying to set up from
    s3.access.key
    s3a.access.key
    AWS_ACCESS_KEY_ID
    with the correspondent secret vars, but no solution has effec and I keep getting the error at jobmanager start log:
    Copy code
    java.util.concurrent.CompletionException: com.amazonaws.SdkClientException: Unable to load AWS credentials from any provider in the chain: [EnvironmentVariableCredentialsProvider: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY)), SystemPropertiesCredentialsProvider: Unable to load AWS credentials from Java system properties (aws.accessKeyId and aws.secretKey), WebIdentityTokenCredentialsProvider: You must specify a value for roleArn and roleSessionName, com.amazonaws.auth.profile.ProfileCredentialsProvider@1f86353d: profile file cannot be null, com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper@1b597309: Failed to connect to service endpoint
    Anyone could help me with this? Thank you.
  • d

    Dominic Lindsay

    06/15/2023, 12:57 PM
    Hi I am having trouble decoding protobuf values in pyflink received from the kafka data streaming connector: I am trying to deserialise a evaluation type:
    Copy code
    // Representation of a Model Evaluation that was made:
    // "This model produced this output when given this input".
    message Evaluation {
        string id = 1;
        Model model = 2;
        ModelIO io = 3;
        CorrelationData correlation_data = 4;
        google.protobuf.Timestamp timestamp = 5;
    }
    
    message Model {
        string name = 1;
        string version = 2;
    }
    
    message ModelIO {
        google.protobuf.Struct input = 1;
        google.protobuf.Struct output = 2;
    }
    
    message CorrelationData {
        google.protobuf.Struct data = 1;
    }
    Evaluations store timestamps as
    google.protobuf.Timestamp
    typed values. Evaluations are serialised as protobuf byte arrays and are written to a evaluations kafka topic as kafka messages:
    Copy code
    {
    key: evaluation.id
    msg: evaluation
    }
    Consumers are able to consume from topics. When they do they receive the kafka message object, it is serialised as a byte string/array/vector of the structure described above. When Flink java process consumes these message it will encode the
    msg
    object as a
    UTF-8
    string and pass the encoded string to python so that PyFlink can consume the message. We are then able to deserialise the object:
    Copy code
    def deserialise(value):
            bytes = value.encode('utf-8')
            evaluation = Evaluation.FromString(bytes)
            return evaluation
    However an error is raised:
    Copy code
    *** google.protobuf.message.DecodeError: Error parsing message with type 'evaluation.Evaluation'
    This is because the timestamp field is not correctly handled. The byte string looks like this:
    Copy code
    b'\n\npxgqdrnbtj\x12\x06\n\x01c\x12\x016\x1a3\n\x17\n\x15\n\x05input\x12\x0c\x1a\npoietemwix\x12\x18\n\x16\n\x06output\x12\x0c\x1a\nhbzualvwbz"\x17\n\x15\n\x13\n\x03ref\x12\x0c\x1a\nnoqqltgkfq*\x0c\x08\xef\xbf\xbd\xd1\xa7\xef\xbf\xbd\x06\x10\xef\xbf\xbd\xef\xbf\xbd\xef\xbf\xbd\xef\xbf\xbd\x03'
    The last 26 bytes (everything from and including the
    *
    ) encode the timestamp field. If we try to deserialise the object whilst omitting the timestamp field, no error is raised:
    Copy code
    python
    (pdb) Evaluation.FromString(bytes[:-26])
    id: "pxgqdrnbtj"
    model {
      name: "c"
      version: "6"
    }
    io {
      input {
        fields {
          key: "input"
          value {
            string_value: "poietemwix"
          }
        }
      }
      output {
        fields {
          key: "output"
          value {
            string_value: "hbzualvwbz"
          }
        }
      }
    }
    correlation_data {
      data {
        fields {
          key: "ref"
          value {
            string_value: "noqqltgkfq"
          }
        }
      }
    }
    I create a second script which uses
    confluent-kafka.Consumer
    and attempt to deserialised the
    msg
    field of a kafka message.
    Copy code
    consumer = Consumer({
            "bootstrap.servers": BOOTSTRAP_SERVER,
            "client.id": "sample-producer",
            "group.id": "consumer",
        })
    
        consumer.subscribe([TOPIC])
    
        while True:
            try:
                msg = consumer.poll(1.0)
                if not msg:
                    continue
                print(msg.value())
                breakpoint()
            except KeyboardInterrupt:
                break
    We can copy the byte string produced by this script into the flink program debug context:
    Copy code
    python
    (pdb) bs = b'\n\nadtiegqpdd\x12\x06\n\x01c\x12\x015\x1a3\n\x17\n\x15\n\x05input\x12\x0c\x1a\noscqcjntwt\x12\x18\n\x16\n\x06output\x12\x0c\x1a\ngwwkwcizol"\x17\n\x15\n\x13\n\x03ref\x12\x0c\x1a\ncsicpsesyc*\x0c\x08\xd5\xc6\xab\xa4\x06\x10\xe8\x9f\xa9\xba\x03'
    (pdb) Evaluation.FromString(bs)
    id: "adtiegqpdd"
    model {
      name: "c"
      version: "5"
    }
    io {
      input {
        fields {
          key: "input"
          value {
            string_value: "oscqcjntwt"
          }
        }
      }
      output {
        fields {
          key: "output"
          value {
            string_value: "gwwkwcizol"
          }
        }
      }
    }
    correlation_data {
      data {
        fields {
          key: "ref"
          value {
            string_value: "csicpsesyc"
          }
        }
      }
    }
    timestamp {
      seconds: 1686823765
      nanos: 927617000
    }
    This shows the protobuf timestamp type is available on the filesystem. Therefore I believe the issue is related to how PyFlink is reading kafka messages. Pyflinks kafka consumer is a java class which is called from a python process. - The python representation configures initialises a java object via
    java gateway
    , by configuring its charset to UTF-8. - The Java backend simply serialises the value fields and returns it to caller (source). I believe there is an issue with the encoding but I am currently unsure. Has anyone has any similar issues when handling protobuf messages from kafka?
    d
    • 2
    • 7
  • a

    Adam Augusta

    06/15/2023, 1:47 PM
    Is there any way to specify join hints in the Table DSL, or is that SQL only?
    m
    • 2
    • 3
  • s

    Slackbot

    06/15/2023, 3:49 PM
    This message was deleted.
    m
    • 2
    • 10
  • m

    mralfredgui

    06/15/2023, 4:09 PM
    Error with
    key_by
    and
    reduce
    operators when locally running pyflink 1.13. Here is the code:
    Copy code
    env = StreamExecutionEnvironment.get_execution_environment()
        env.set_parallelism(1)
        ds = env.from_collection(
            collection=[(1, 2), (2, 3), (1, 3), (2, 4)],
            type_info=Types.ROW([<http://Types.INT|Types.INT>(), <http://Types.INT|Types.INT>()]))
        concat_stream = ds.key_by(lambda x: x[0], key_type=<http://Types.INT|Types.INT>()).reduce(lambda a, b: (a[0], a[1] + b[1]))
        concat_stream.print()
        env.execute("tutorial_job")
    This is the key error message:
    Copy code
    AttributeError: 'tuple' object has no attribute 'get_fields_by_names'
    is there anything wrong with the code?
    d
    a
    • 3
    • 5
  • d

    Danila Maksimenko

    06/15/2023, 6:46 PM
    Hello everyone, I recently updated flink to 1.17.1 and I started to get this error .
    Copy code
    2023-06-15 18:41:30,374 INFO  org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository [] - Delegation token receiver s3-hadoop loaded and initialized
    2023-06-15 18:41:30,375 ERROR org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository [] - Failed to initialize delegation token receiver s3-hadoop
    java.lang.IllegalStateException: Delegation token receiver with service name {} has multiple implementations [s3-hadoop]
            at org.apache.flink.util.Preconditions.checkState(Preconditions.java:215) ~[flink-dist-1.17.1.jar:1.17.1]
            at org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository.lambda$loadReceivers$0(DelegationTokenReceiverRepository.java:75) ~[flink-dist-1.17.1.jar:1.17.1]
            at java.util.Iterator.forEachRemaining(Iterator.java:116) ~[?:1.8.0_372]
            at org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository.loadReceivers(DelegationTokenReceiverRepository.java:98) ~[flink-dist-1.17.1.jar:1.17.1]
            at org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository.<init>(DelegationTokenReceiverRepository.java:60) ~[flink-dist-1.17.1.jar:1.17.1]
            at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.startTaskManagerRunnerServices(TaskManagerRunner.java:245) ~[flink-dist-1.17.1.jar:1.17.1]
            at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.start(TaskManagerRunner.java:293) ~[flink-dist-1.17.1.jar:1.17.1]
            at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:486) ~[flink-dist-1.17.1.jar:1.17.1]
            at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.lambda$runTaskManagerProcessSecurely$5(TaskManagerRunner.java:530) ~[flink-dist-1.17.1.jar:1.17.1]
            at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_372]
            at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_372]
            at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899) [hadoop-common-3.3.5.jar:?]
            at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) [flink-dist-1.17.1.jar:1.17.1]
            at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerProcessSecurely(TaskManagerRunner.java:530) [flink-dist-1.17.1.jar:1.17.1]
            at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerProcessSecurely(TaskManagerRunner.java:510) [flink-dist-1.17.1.jar:1.17.1]
            at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.main(TaskManagerRunner.java:468) [flink-dist-1.17.1.jar:1.17.1]
    my hadoop version is 3.3.5, hive 3.1.3, scala 2.12, iceburg 1.3.0 and serval plugins S3, azure blob, AWS_SDK_VERSION 2.20.18. and for local dev we are using minio for s3. (edit) adding security.delegation.token.provider.s3-hadoop.enabled: false para to flinkconf.yaml has solved to issue of none start, but I would like a better solution without having to disable anything.
    s
    m
    • 3
    • 16
  • c

    Clen Moras

    06/15/2023, 10:50 PM
    Flink SQL> CREATE TABLE pokemon11 (
    id VARCHAR,
    name VARCHAR,
    type VARCHAR,
    hp VARCHAR,
    attack VARCHAR,
    defense VARCHAR,
    weakness VARCHAR,
    region VARCHAR
    )
    WITH (
    'connector' = 'hive',
    'path' = 's3a://clen-test/',
    'format' = 'csv',
    'csv.field-delimiter' = ',',
    'csv.ignore-parse-errors' = 'true'
    );
    [INFO] Table has been created. Flink SQL> select * from pokemon11; [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Reason: Required context properties mismatch. The following properties are requested: connector=hive csv.field-delimiter=, csv.ignore-parse-errors=true format=csv path=s3a://aiops-datalake-test/ schema.0.data-type=VARCHAR(2147483647) schema.0.name=id schema.1.data-type=VARCHAR(2147483647) schema.1.name=name schema.2.data-type=VARCHAR(2147483647) schema.2.name=type schema.3.data-type=VARCHAR(2147483647) schema.3.name=hp schema.4.data-type=VARCHAR(2147483647) schema.4.name=attack schema.5.data-type=VARCHAR(2147483647) schema.5.name=defense schema.6.data-type=VARCHAR(2147483647) schema.6.name=weakness schema.7.data-type=VARCHAR(2147483647) schema.7.name=region The following factories have been considered: org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory I got the following error, but i m not sure what i m doing wrong. any suggestions.
    m
    • 2
    • 2
  • s

    Sucheth Shivakumar

    06/15/2023, 11:51 PM
    we started getting the below error after we bumped the flink version from 1.15 to 1.17, Any idea why is this happening ?
    Copy code
    org.apache.flink.util.FlinkException: Fatal error occurred in TaskExecutor <akka.tcp://flink@100.126.185.4:6122/user/rpc/taskmanager_0>.
    org.apache.flink.util.FlinkException: The TaskExecutor's registration at the ResourceManager <akka.tcp://flink@100.126.2.182:6123/user/rpc/resourcemanager_0> has been rejected: Rejected TaskExecutor registration at the ResourceManager because: The ResourceManager does not recognize this TaskExecutor.
    	at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2442) ~[flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2386) ~[flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:109) ~[flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:40) ~[flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.runtime.registration.RegisteredRpcConnection.lambda$createNewRegistration$0(RegisteredRpcConnection.java:269) ~[flink-dist-1.17.0.jar:1.17.0]
    	at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?]
    	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) ~[?:?]
    	at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) ~[?:?]
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453) ~[flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
    	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453) ~[flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218) ~[flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168) ~[flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
    	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
    	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
    	at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) [flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
    	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) [flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
    	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
    	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) [flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
    	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
    	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
    	at akka.actor.Actor.aroundReceive(Actor.scala:537) [flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
    	at akka.actor.Actor.aroundReceive$(Actor.scala:535) [flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
    	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
    	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579) [flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
    	at akka.actor.ActorCell.invoke(ActorCell.scala:547) [flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
    	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
    	at akka.dispatch.Mailbox.run(Mailbox.scala:231) [flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
    	at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [flink-rpc-akka_cc24e6b4-98f0-431f-98c7-e598a837bc44.jar:1.17.0]
    	at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?]
    	at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) [?:?]
    	at java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?]
    	at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) [?:?]
    	at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) [?:?]
  • y

    Yaroslav Bezruchenko

    06/16/2023, 12:30 AM
    Hey, I'm trying to run pretty heavy Flink job using PyFlink and Flink 1.16.2 on Kubernetes (using Flink Kubernetes Operator 1.5). After some time I'm receiving next exception. Any idea what it can be?
    Copy code
    2023-06-16 00:19:51,850 ERROR org.apache.beam.runners.fnexecution.control.FnApiControlClient [] - FnApiControlClient received an error.
    org.apache.beam.vendor.grpc.v1p43p2.io.grpc.StatusRuntimeException: CANCELLED: client cancelled
    	at org.apache.beam.vendor.grpc.v1p43p2.io.grpc.Status.asRuntimeException(Status.java:526) ~[blob_p-934ed733171eb46b5e06f37d5f96d10cd381ed93-977991e704a3c43c3b629ef06bb93e3c:1.16.2]
    
    ...
    
    2023-06-16 00:19:51,852 WARN  org.apache.beam.runners.fnexecution.logging.GrpcLoggingService [] - Logging client failed unexpectedly.
    org.apache.beam.vendor.grpc.v1p43p2.io.grpc.StatusRuntimeException: CANCELLED: client cancelled
    	at org.apache.beam.vendor.grpc.v1p43p2.io.grpc.Status.asRuntimeException(Status.java:526) ~[blob_p-934ed733171eb46b5e06f37d5f96d10cd381ed93-977991e704a3c43c3b629ef06bb93e3c:1.16.2]
    	at org.apache.beam.vendor.grpc.v1p43p2.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:291) [blob_p-934ed733171eb46b5e06f37d5f96d10cd381ed93-977991e704a3c43c3b629ef06bb93e3c:1.16.2]
    
    ...
    
    2023-06-16 00:19:51,850 ERROR org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer        [] - Failed to handle for unknown endpoint
    org.apache.beam.vendor.grpc.v1p43p2.io.grpc.StatusRuntimeException: CANCELLED: client cancelled
    
    ....
    
    2023-06-16 00:19:51,861 ERROR org.apache.beam.runners.fnexecution.control.FnApiControlClient [] - FnApiControlClient closed, clearing outstanding requests {1=java.util.concurrent.CompletableFuture@73abe63d[Not completed, 1 dependents]}
    Next issues claim that there are deserialization issue with record, but actual cause is:
    Copy code
    Caused by: org.apache.beam.vendor.grpc.v1p43p2.io.grpc.StatusRuntimeException: CANCELLED: call already cancelled. Use ServerCallStreamObserver.setOnCancelHandler() to disable this exception
    d
    • 2
    • 3
  • k

    kingsathurthi

    06/16/2023, 7:33 AM
    Hi All, Is there any version compatibility matrix information available between CRD,Flink Operator and Flink
    m
    • 2
    • 3
  • a

    Ari Huttunen

    06/16/2023, 8:16 AM
    We have a peculiar problem with our pipeline reading from Kafka, doing aggregation, writing them back to Kafka. These parts work fine. (Ok, sort of fine, it's not particularly efficient, but it doesn't break.) What does break is making checkpoints. Currently the instances only have shared S3 storage in the internal data center. It's currently failing, and it's giving the error
    Copy code
    2023-06-16 11:13:05,718 WARN  org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory [] - Could not close the state stream for <s3p://some-bucket/flink.internal.checkpoints/18c1d780196f5badf2a4a83d7452d300/chk-1/24f92907-26eb-409d-bb73-da00443b4602>.
    java.io.IOException: No space left on device
    The partition where Flink is installed does not have much space space, and neither does
    /tmp
    , but we've tried moving everything to a partition that has a lot more storage, like this:
    Copy code
    io.tmp.dirs: {{ flink_tmp_dir }}
    process.working-dir: {{ flink_tmp_dir }}
    web.tmpdir: {{ flink_tmp_dir }}
    historyserver.web.tmpdir: {{ flink_tmp_dir }}
    jobmanager.archive.fs.dir: {{ flink_tmp_dir }}
    jobmanager.web.tmpdir: {{ flink_tmp_dir }}
    jobmanager.web.upload.dir: {{ flink_tmp_dir }}
    web.upload.dir: {{ flink_tmp_dir }}
    env.log.dir: {{ flink_log_dir }}
    S3 is not full. Nothing gets full locally as far as I can see with
    df -h
    . Any suggestions? Edit: I also tried installing Flink to a partition that has lots of space, but it didn't help. That partition is meant for data, so I moved it back.
    m
    • 2
    • 9
  • s

    Sunidhi Tiwari

    06/16/2023, 1:24 PM
    Hi guys. How to read partitioned data from a file system in Zeppelin and perform streaming SQL using Flink. The file system stores data in format yyyy-MM-DD--HH.
  • s

    Slackbot

    06/16/2023, 1:28 PM
    This message was deleted.
    d
    • 2
    • 1
  • l

    Lluc

    06/16/2023, 2:13 PM
    Hi, I've been searching for a few hours and don't see any way to do a
    JSON_OBJECT('k' VALUE "another json")
    to have nested JSONs, this is escaping the value as a string. Someone can give me a hand. Also, I want to add that our nested JSON is dynamic (doesn't have a predefined form) and we are forced to use Table API only 😕 We are migrating to Flink 1,17
  • d

    Danny Cranmer

    06/16/2023, 5:09 PM
    Hello all. Is there a way to ensure subTasks are evenly spread across TMs when the operator parallelism is less than the job parallelism? We have
    cluster.evenly-spread-out-slots
    set true and I can see that slots are evenly spread across TMs. However, the subTasks are not evenly spread across TMs.
  • y

    Yaroslav Bezruchenko

    06/16/2023, 5:49 PM
    How does autoscale work on Flink Kubernetes operator 1.5
    Copy code
    kubernetes.operator.job.autoscaler.enabled: "true"
    kubernetes.operator.job.autoscaler.stabilization.interval: "1m"
    kubernetes.operator.job.autoscaler.metrics.window: "3m"
    With this config my deployment restarts each 10 mins with next logs:
    Copy code
    Scaling vertices: Vertex ID a267aa7696f616c525ab17d1bd9850c8 | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 8.55 Vertex ID 5570a4732e197e06bd668b2739d98f8c | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 8.81 Vertex ID 2963852293169ba90d9d1e7d6308db5c | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 0.00 Vertex ID c3c7122991bd9906cb9d1effc69b74a3 | Parallelism 8 -> 6 | Processing capacity 92.21 -> 47.00 | Target data rate 23.36 Vertex ID 456225327bbc50a04e000587e8caa7d1 | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 22.74 Vertex ID c49c06ed19ebbb8d7fd89425e1732dc8 | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 22.78 Vertex ID 6dc0226b15c44c9c2e1f9ea1a65fd400 | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 1.84 Vertex ID 5c4ca2fea30dcf09bf3ee40c495fe808 | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 0.00 Vertex ID 4952818ce422a2d9bf045a1179b37754 | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 22.40 Vertex ID 47c512bbcb5e27815beb7632e2a43ec4 | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 8.81 Vertex ID 7b1f408ee98c276c042c8126a0421765 | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 22.71 Vertex ID d412772f7356afb1e854a3e17e6fc99e | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 22.30 Vertex ID 02dc93997fd5aa6594c92b9fccde27e2 | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 22.23 Vertex ID 3b1890aff09d9152377d394a8c3ad1d5 | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 22.31 Vertex ID bf1889e653b6e7d928d2bd2604dca8dd | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 19.88 Vertex ID e20cdc086f87b1f78afa7062ae50c3da | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 19.93 Vertex ID 6cdc5bb954874d922eaee11a8e7b5dd5 | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 0.00 Vertex ID de53745574bc42e95b1357827cc9f260 | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 8.77 Vertex ID 1abb23b8bef7c5612f869be2fb3fcf61 | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 8.39 Vertex ID 91ae58460ac994d528a45eda171f987a | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 22.47 Vertex ID 7cbd5491ae25a563a6d4672f34d0f931 | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 8.45 Vertex ID 01c3449094a3cab3b695255ee333decb | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 20.10 Vertex ID cbc357ccb763df2852fee8c4fc7d55f2 | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 8.50 Vertex ID 7b958a0c17d67bafbb848d71292964cd | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 20.02 Vertex ID adc382fc56e42e5a1c3e97cd60017201 | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 22.50 Vertex ID 1171dea6747ab509fdaefbe74f7195af | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 0.00 Vertex ID 2fa5fc7df17f61cb0e1946288970d799 | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 0.00 Vertex ID 79eb943b5d8344895c53163f0bcb07cb | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 22.77 Vertex ID 790ed4d19311724271d676c43c6e3949 | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 8.73 Vertex ID b1a2a2523a4642215643a6a4e58f0d05 | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 0.00 Vertex ID 760b6fbf7de95dce57f3d3f87acab928 | Parallelism 8 -> 4 | Processing capacity Infinity -> Infinity | Target data rate 8.52
    g
    • 2
    • 2
  • z

    Zhong Chen

    06/16/2023, 5:59 PM
    Is there a way to force a task manager to restart in case it fails to instantiate a metrics reporter? I just found that one of my task managers failed to report metrics back. After I checked the logging, I found that it failed to instantiate the dd metrics reporter due to a temporary network issue. However, in my case, I prefer the task manager to fail fast instead of continuing running. My app is running in a k8s env, so I was thinking to add a health check for the pods to ensure they report metrics. I was wondering whether there is better approach?
  • b

    Bharathkrishna G M

    06/16/2023, 10:25 PM
    Hi, I had a question regarding CheckpointListener. Is it possible to register some function for onCheckpointComplete I'm looking if there's something like:
    Copy code
    env.addCheckpointListener(checkPtComplete())
    Or any alternate way I can listen to checkpoints and do some logic ?
    m
    • 2
    • 2
  • s

    Sunidhi Tiwari

    06/17/2023, 5:42 PM
    Hi, Is there any way to partitioned data sink data in a file system?
    Copy code
    final StreamingFileSink<String> sink = StreamingFileSink
        .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
        .withRollingPolicy(
            DefaultRollingPolicy.builder()
                .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
                .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
                .withMaxPartSize(1024 * 1024 * 1024)
                .build())
    	.build();
    The above code will assign records to the default one hour time buckets. I want to write all the parts in same directory, don't want hourly bucket. Is there is way to do so?
    a
    • 2
    • 9
  • h

    Hygor Knust

    06/17/2023, 6:08 PM
    Hi all, Is there a way to ignore Avro (Confluent Schema Registry) deserialization errors using the Flink SQL API? For JSON, CSV and Protobuf there is a configuration suffix
    *.ignore-parse-errors
    , but it is not available for Avro.
    m
    a
    • 3
    • 13
  • d

    Dheeraj Panangat

    06/18/2023, 7:08 AM
    Hi Team, Can we store flink tables in rocks db state ? Our use case if to have data of 1 day in state, in form of tables, so that we can query and filter based on that data. Also I would be writing the state to a volume, which I believe should be retained if the pods terminate and re-spawn. Thanks
    g
    • 2
    • 1
  • m

    Mishel Liberman

    06/18/2023, 9:02 AM
    hey, I installed minio in my k8s cluster, flink failed to connect to minio through the s3 api when minio was using TLS, without TLS everything works, any idea how to overcome this issue?
  • k

    kiran kumar

    06/18/2023, 2:19 PM
    Hi Team, We are trying to set up Flink cluster mode in k8s. The setup has 1 jobmanager and 2 taskmanagers with k8s "kind: Deployment". We followed all the steps mentioned in the below documentation. But the issue we are facing is, when deployed it is running in standalone mode and not cluster mode. We are seeing only 1 task manager in Flink dashboard. Can anyone review our configurations and suggest where we are going wrong. This is the first time our company trying to set up Flink application.
    Copy code
    flink-properties:
      checkpoint-interval: 5000
      checkpoint-timeout: 15000
      checkpoint-pause-between: 500
      rest-address: sherlocksvc-jobmanager.sherlocksvc
      rest-port: 8082
      blob-server-port: 6124
      data-port: 6121
    
      jobmanager-rpc-address: sherlocksvc.sherlocksvc.sherlocksvc
      jobmanager-rpc-port: 6123
      taskmanager-rpc-port: 6122
      taskmanager-numberOfTaskSlots: 1
    https://blog.knoldus.com/flink-on-kubernetes/
    b
    • 2
    • 4
  • k

    kingsathurthi

    06/19/2023, 7:56 AM
    Hi All, After upgrading Flink operator to 1.5.0 and when we install flinkdeployment we are below issue. how to resolve this?. The rbac is present FYI
    Copy code
    NAME                    JOB STATUS   LIFECYCLE STATE
    flinkdeployment-uat              UPGRADING
    Error:
    Copy code
    Status:
      Cluster Info:
      Error:                          {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.client.deployment.ClusterDeploymentException: Could not create Kubernetes cluster \"flinkdeployment-uat\".","throwableList":[{"type":"org.apache.flink.client.deployment.ClusterDeploymentException","message":"Could not create Kubernetes cluster \"flinkdeployment-uat\"."},{"type":"org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException","message":"Failure executing: POST at: <https://172.30.0.1/apis/apps/v1/namespaces/dev-istio/deployments>. Message: Forbidden!Configured service account doesn't have access. Service account may have been revoked. deployments.apps \"flinkdeployment-uat\" is forbidden: cannot set blockOwnerDeletion if an ownerReference refers to a resource you can't set finalizers on: , <nil>."}]}
      Job Manager Deployment Status:  MISSING
  • b

    Bhavya Soni

    06/19/2023, 9:23 AM
    Hi everyone, I am new to flink and using these libraries.
    Copy code
    apache-flink==1.14.4
    apache-flink-libraries==1.14.4
    pyflink==1.0
    I am facing an error regarding
    from pyflink.connector.gcp.pubsub import PubSubSource
    ModuleNotFoundError: No module named 'pyflink.connector'
    . I want to use the pubsub connector to stream the data to SIEM servers. I came across this documentation https://nightlies.apache.org/flink/flink-docs-release-1.17/api/python/reference/pyflink.datastream/connectors.html and wanted to know that is there any available implementation for pubsub connector in flink for python?
    a
    m
    d
    • 4
    • 24
  • a

    Ari Huttunen

    06/19/2023, 11:45 AM
    This doesn't seem like it's important, but some of my subtasks have oddly large accumulated time
    0ms / 0ms / 19527d 10h 49m 58s
    . Is that -1 by any chance? It also shows up as Busy (max): 100%, but I have some doubts of that as well.
  • j

    Jonathan Feinberg

    06/19/2023, 2:43 PM
    Hello everyone, I'm trying to get PyFlink (v1.15) Table API to work and have encountered an issue I don't understand and would appriciate some help, if possible. I have a simple insertion query using group by, like this:
    Copy code
    INSERT INTO sink_table
        SELECT
            `key`,
            `journeyRef`,
            MAX(`eventTimestamp`) AS `eventTimestamp`,
            SUM(`alightingCount`) AS `totalAlighting`
        FROM source_table
        GROUP BY `key`, `journeyRef`
    I am trying to source from filesystem source connector to filesystem sink connector, and I am getting the error:
    Copy code
    pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.accjoin_filesystem' doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[key, journeyRef], select=[key, journeyRef, MAX(eventTimestamp) AS eventTimestamp, SUM(alightingCount) AS totalAlighting])
    However, if I replace the sink with a print connector, it works as expected. Is this a bug, or am I missing a limitation in how the filesystem connector works?
    m
    • 2
    • 5
  • k

    kiran kumar

    06/19/2023, 5:40 PM
    Hi All, Trying to deploy Flink in k8s. After deploying it is starting as "Mini cluster mode" and not normal cluster or standalone-job mode. For this we need to pass command in jobmanager deployment.yaml.
    Copy code
    command: ["/opt/flink/bin/standalone-job.sh"]
    But the issue is, we are dockerizing our flink application and deploying. The start command path "/opt/flink" is not present in our jobmanager pod. We are getting the path unknown. How does "/opt/flink/" path is created and how to find standalone-job.sh file ?
    g
    k
    • 3
    • 3
  • a

    Anmol

    06/20/2023, 5:35 AM
    Hi All has anybody used canary deployment in flink kubernetes operator 1.5.0 have some design/implementation questions, would be great if anybody could help. TIA
    ✅ 1
    g
    • 2
    • 3
  • y

    Yaroslav Bezruchenko

    06/20/2023, 9:02 AM
    Hey, I want to use state to deduplicate items received from Kafka (topic itself might contain duplication). I already have experience with MapState, but in ListState there is no contains() method. Is there any way to use ListState efficiently to check if item is in state, without iterating a whole state?
    m
    p
    s
    • 4
    • 8
1...888990...98Latest