https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • h

    Huaqiang Kang

    02/09/2023, 10:06 PM
    Hi everyone, I have a question about Flink K8s Operator. For a Flink session job deployment, I can use S3 or HTTP as jarURI. Is there a way to pass HTTP basic auth in the session job deployment because our jar files are located in private Jfrog artifactory?
    s
    h
    r
    • 4
    • 3
  • d

    Daiyan Chowdhury

    02/09/2023, 11:39 PM
    Hi everyone, I have a question about savepoints. Currently running into an issue where I stop a job with a savepoint, but when starting a job I get the error that the savepoint file does not exist (the savepoint directory and metadata file gets created) Configs for savepoints: state.backend: filesystem state.savepoints.dir: <file///opt/flink/savepoint|file///opt/flink/savepoint>
    m
    • 2
    • 3
  • r

    Ryner Menezes

    02/10/2023, 4:56 AM
    Hi everyone, Trying out a simple job in pyflink to read data from a file in adls using the data stream api • The required jar file ‘flink-azure-fs-hadoop-1.16.0.jar’ is added to the plugins folder of both task and job managers and being called to the execution env using add_jar. Yet, keep running into this error ‘org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'abfs'.’ required authentication access keys from the storage account are already added into the flink config yaml file. Is there something that I’m missing out on?
    m
    • 2
    • 9
  • k

    kingsathurthi

    02/10/2023, 7:04 AM
    [Flink Operator 1.3.0]Can help anyone help me understand why I'm getting this error in the taskmanager,
    Copy code
    2023-02-10 06:57:26,917 INFO  org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner [] - --------------------------------------------------------------------------------
    2023-02-10 06:57:26,977 INFO  org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner [] - Registered UNIX signal handlers for [TERM, HUP, INT]
    2023-02-10 06:57:26,990 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: blob.server.port, 6124
    2023-02-10 06:57:26,990 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: kubernetes.jobmanager.annotations, <http://flinkdeployment.flink.apache.org/generation:2|flinkdeployment.flink.apache.org/generation:2>
    2023-02-10 06:57:26,990 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: job-result-store.storage-path, file:///flink-data/ha/job-result-store/gmf-gnodeb-sa-samsung/6dd3883c-7acc-47b7-8737-29dfea42547c
    2023-02-10 06:57:26,990 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: kubernetes.jobmanager.replicas, 2
    2023-02-10 06:57:26,990 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: execution.submit-failed-job-on-application-error, true
    2023-02-10 06:57:26,991 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: high-availability.cluster-id, kmf-flink
    2023-02-10 06:57:26,991 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: metrics.reporter.prom.factory.class, org.apache.flink.metrics.prometheus.PrometheusReporterFactory
    2023-02-10 06:57:26,991 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: state.savepoints.dir, file:///flink-data/savepoints
    2023-02-10 06:57:26,991 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: kubernetes.taskmanager.cpu, 1.0
    2023-02-10 06:57:26,991 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: kubernetes.service-account, flink
    m
    • 2
    • 4
  • y

    Yang LI

    02/10/2023, 9:39 AM
    Hello guys, we have done a upgrade from flink 13 to flink 16 now, we see a significant increase for our flink taskmanager backpressure time, is there some conf in flink 16 to do tunning so we can optimize le backpressure time ?🙏 here is the metric we used to observe backpressure
    flink_taskmanager_job_task_backPressuredTimeMsPerSecond
    d
    • 2
    • 4
  • n

    Nicholas Erasmus

    02/10/2023, 11:24 AM
    Hi! I'm hoping to get some advice on an issue we're having: • We are migrating from Spark to Flink for our company's data product pipelines (CDC might remain in Spark) • The Sink is Delta Table (Parquet format) • We absolutely need parity with what is being produced in the Delta Table by Spark Unfortunately that last point seems to be impossible at present. So the particular field in the Parquet record currently looks like this:
    Copy code
    {
      ...
      "our_field_name": {
        "our_key_1": "our_value_1",
        "our_key_2": "our_value_2",
        ...
      }
    }
    In our Flink (Kotlin) application, we are defining the row as follows:
    Copy code
    RowType.RowField(
        "our_field_name",
        MapType(
            VarCharType(VarCharType.MAX_LENGTH),
            VarCharType(VarCharType.MAX_LENGTH)
        )
    )
    ... and we are setting the field with:
    Copy code
    rowData.setField(
        OUR_FIELD_POSITION,
        GenericMapData(
            our_field.entries.associate {
                StringData.fromString(it.key) to StringData.fromString(it.value)
            }
        )
    )
    When we inspect the Parquet records however, we see the following is produced:
    Copy code
    {
        ...
        "meta_info": {
            "key_value": [
                {
                    "key": "providerCode",
                    "value": "declined-provider-code"
                }
            ]
        }
    }
    I had a look at the source code here in the link below, and the
    key_val
    ,
    key
    , and
    val
    that is added above is hardcoded when converting a `MAP`: https://github.com/apache/flink/blob/ffecb68b11e2cd914f92699112b5adb62a48724e/flin[…]/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java I'd like to ask for advice on this issue. E.g. has anyone else had the requirement that we have and has anyone looked into this? Would it be possible to submit a pull request to address this? Thank you 🙂
  • n

    Nihar Rao

    02/10/2023, 4:18 PM
    Hi, I am using flink 1.15.3 and have written a custom metric reporter. I want to consume it as a plugin and have followed all necessary steps. My reporter factory implements the MetricReporterFactory and my actual reporter class implements MetricReporter. I have added the shaded jar under a subfolder in plugins directory. I added the relevant metric.reporters.reporter_name.factory.class config in flink-conf.yaml and when I see the cluster logs, it clearly says that my reporter factory was not found and it is also not in the list of available factories. To be sure of the factory class definition, I explored my jar using
    jar xf <name>.jar
    and the path clearly exists and is correctly defined in flink-conf.yaml. My plugins dir looks like:
    Copy code
    plugins
              metrics-nihar
                                 custom_reporter.jar
    Can you please let me know how to debug this further?
    • 1
    • 2
  • r

    René

    02/10/2023, 5:23 PM
    Hi all, in Flink SQL I got the following behaviour: with every update in the source database (which is read by Debezium) I get first a op D followed by I (see picture). The source table is defined by an upsert-kafka connector, the underlying topic is not compacted. Is there a way to get rid of these D operations (null values)? Thanks for any advice, best regards, René
  • a

    Artun Duman

    02/10/2023, 6:07 PM
    Hi everyone, can someone confirm my understanding of
    SplitEnumerator
    , specifically for Kafka sources. I see that it runs on job manager and round-robins to task managers. Does it mean that we might be bottlenecked by the job manager process and there isn’t a way to increase number of kafka partitions and make the reading/load balancing faster? Has anyone run into a limitation like this? Thanks!
    m
    k
    t
    • 4
    • 13
  • a

    Amir Hossein Sharifzadeh

    02/10/2023, 6:46 PM
    Docker has an issue with Apple M1 Silicon: I am not able to run this command: docker-compose -up and get this error: broker The requested image’s platform (linux/amd64) does not match the detected host platform (linux/arm64/v8) and no specific platform was requested
    m
    • 2
    • 7
  • j

    Joe Crowley

    02/11/2023, 12:34 AM
    hi all, has anyone here had success running a beam application written in go on a separate flink cluster? If so, could you please share the commands used to get it up and running (e.g. docker run <job-server / flink-jobmanager / flink-taskmanager> ). I am trying to find a documented example from start to finish using wordcount or similar but no luck specific to go.
  • e

    Eugenio Marotti

    02/11/2023, 10:22 AM
    Hi everyone. I'm implementing a data analysis pipeline in Flink and I have a problem converting a DataStream to a Table. I have this table defined from a join between two Kafka sources:
    Copy code
    Table legalFileEventsTable = legalFilesTable.join(eventsTable)
                .where($("id").isEqual($("id_fascicolo")))
                .select(
                        $("id").as("id_fascicolo"),
                        $("id_evento"),
                        $("giudice"),
                        $("nrg"),
                        $("codice_oggetto"),
                        $("ufficio"),
                        $("sezione"),
                        $("data_evento"),
                        $("evento"),
                        $("data_registrazione_evento")
                );
    Then I convert the joined table to a DataStream to apply some computation on the data. Here's the code I'm using:
    Copy code
    DataStream<Row> phasesDurationsDataStream = tEnv.toChangelogStream(legalFileEventsTable)
                .keyBy(r -> r.<Long>getFieldAs("id_fascicolo"))
                .process(new PhaseDurationCounterProcessFunction());
        phasesDurationsDataStream.print();
    The PhaseDurationCounterProcessFunction emits a Row like this:
    Copy code
    Row outputRow = Row.withNames(RowKind.INSERT);
            outputRow.setField("id_fascicolo", currentState.getId_fascicolo());
            outputRow.setField("nrg", currentState.getNrg());
            outputRow.setField("giudice", currentState.getGiudice());
            outputRow.setField("codice_oggetto", currentState.getCodice_oggetto());
            outputRow.setField("ufficio", currentState.getUfficio());
            outputRow.setField("sezione", currentState.getSezione());
            outputRow.setField("fase", currentState.getPhase());
            outputRow.setField("fase_completata", false);
            outputRow.setField("durata", currentState.getDurationCounter());
            out.collect(outputRow);
    After collecting the results from the process function I reconvert the DataStream to a Table and execute the pipeline:
    Copy code
    Table phasesDurationsTable = tEnv.fromChangelogStream(
                phasesDurationsDataStream,
                Schema.newBuilder()
                        .column("id_fascicolo", DataTypes.BIGINT())
                        .column("nrg", DataTypes.STRING())
                        .column("giudice", DataTypes.STRING())
                        .column("codice_oggetto", DataTypes.STRING())
                        .column("ufficio", DataTypes.STRING())
                        .column("sezione", DataTypes.STRING())
                        .column("fase", DataTypes.STRING())
                        .column("fase_completata", DataTypes.BOOLEAN())
                        .column("durata", DataTypes.BIGINT())
                        .primaryKey("id_fascicolo", "fase")
                        .build(),
                ChangelogMode.upsert()
        );
        env.execute();
    But during the startup I receive this exception:
    Copy code
    Unable to find a field named 'id_fascicolo' in the physical data type derived from the given type information for schema declaration.
     Make sure that the type information is not a generic raw type. Currently available fields are: [f0]
    It seems that the row information (name and type) aren't available yet and so the exception is generated. I tried to invoke the env.execute() before the DataStream->Table conversion and in this case the job starts but I have no output if I print the phasesDurationsTable. Any suggestions on how to make this work?
  • j

    Jason Politis

    02/11/2023, 3:17 PM
    Hello everyone. I'm using flink sql and have major data scew. out o fall 12 subtasks, only is currently active. Do you have any ideas on what to look for and techniques for shuffling our data better?
  • i

    Itamar Dvir

    02/11/2023, 6:50 PM
    Hey everyone, I have a Flink job which processes a data stream and eventually writes it to Redis. I want the user to be able to change the Redis connection settings dynamically. To enable this, I tried to send the connection settings in another stream in order to set the connection settings in the Redis sink according the connection config. But I didn't find a way to connect the config stream directly to the sink. Moreover, even if I connect the streams before the sink and send the config and the data in the same stream to the sink, I still need to have some broadcast state in the sink, so I can have all the sink instances synchronized to the last connection settings. But as far as I can see, broadcast state is working only for BroadcastStream, not for sinks. How something like that can be done?
  • r

    Reme Ajayi

    02/11/2023, 8:49 PM
    Hey Everyone, How do I write the result of a join as an avro record or to a parquet file? From the Flink documentation, I can see that you need to supply the schema of the record or a POJO in order to create a new record. If you have two streams and you join them, how would you know the schema or POJO for your joined stream before joining the streams? Is there some way to extract the schema or create the POJO from your joined stream?
  • s

    Siddhesh Kalgaonkar

    02/12/2023, 8:50 AM
    As a general rule, operators are required to completely process a given watermark before forwarding it downstream. For example, WindowOperator will first evaluate all windows that should be fired, and only after producing all of the output triggered by the watermark will the watermark itself be sent downstream. In other words, all elements produced due to occurrence of a watermark will be emitted before the watermark.
    Why do we need to send the watermark downstream? Can anyone explain the exact meaning of this paragraph? TIA
    d
    • 2
    • 4
  • s

    Siddhesh Kalgaonkar

    02/12/2023, 12:57 PM
    Hello #C03G7LJTS2G Just a question for Pyflink do we have to use Mock and MagicMock/ Pytest like we do in the Python projects way for testing or is there any other way of testing it ? Just need your guidance here since I didn't find any information on the documentation https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/testing/
    d
    • 2
    • 3
  • a

    Amir Hossein Sharifzadeh

    02/12/2023, 9:50 PM
    I am able to print results from my query but can’t extract and iterate over them:
    Copy code
    t_env.sql_query(query).execute().print()
    but can’t iterate over results and it doesn’t print anything:
    Copy code
    table_result = t_env.sql_query(query).execute().collect()
    
    with table_result as results:
        for result in results:
            print(result)
    This is how I start the code:
    Copy code
    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(stream_execution_environment=env)
    t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)
    
    t1 = f"""
            CREATE TABLE raw_table(
                raw_id INT,
                raw_data STRING
            ) WITH (
              'connector' = 'kafka',
              'topic' = 'empad_raw',
              'properties.bootstrap.servers' = 'kafka:9092',
              'properties.group.id' = 'MY_GRP',
              'scan.startup.mode' = 'latest-offset',
              'format' = 'json'
            )
            """
    
    t2 = f"""
            CREATE TABLE bkgd_table(
                raw_id INT,
                raw_data STRING
            ) WITH (
              'connector' = 'kafka',
              'topic' = 'empad_bkgd',
              'properties.bootstrap.servers' = 'kafka:9092',
              'properties.group.id' = 'MY_GRP',
              'scan.startup.mode' = 'latest-offset',
              'format' = 'json'
            )
            """
    
    t_env.execute_sql(t1)
    t_env.execute_sql(t2)
    
    query = "SELECT raw_table.raw_id, raw_table.raw_data, bkgd_table.raw_data " \
            "FROM raw_table JOIN bkgd_table ON raw_table.raw_id = bkgd_table.raw_id"
    
    t_env.sql_query(query).execute().print()
    I will need to get the results from my query and pass the results in another function to process them. Thanks for any suggestions.
    d
    • 2
    • 13
  • k

    Kyle Ahn

    02/12/2023, 11:15 PM
    In Flink, is there a way for an upstream
    ProcessAllWindowFunction
    prior to
    keyBy
    to share state with a downstream
    ProcessWindowFunction
    post
    keyBy
    ?
    • 1
    • 3
  • j

    Jirawech Siwawut

    02/13/2023, 2:46 AM
    Hi. I could not access table on Hive where the file is on s3 bucket. Does anyone know where to configure this? I tried this, but it does not work
    Copy code
    StreamExecutionEnvironment env;
            Properties props = new Properties();
            props.put("s3.access-key", "myaccess");
            props.put("s3.secret-key", "mysecret");
            props.put("s3.endpoint", "myendpoint");
            Configuration conf = ConfigurationUtils.createConfiguration(props);
            env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
    Here is the error
    Copy code
    Caused by: com.amazonaws.AmazonClientException: No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider SharedInstanceProfileCredentialsProvider : com.amazonaws.AmazonClientException: Unable to load credentials from Amazon EC2 metadata service
    g
    r
    • 3
    • 10
  • w

    Warren McKeown

    02/13/2023, 2:58 PM
    My team and I are using v1.16 and have been trying to make use of Flink's asynchronous IO functionality and we've ran into an interesting problem: My main question is "_*What could be causing high memory usage on the Flink JobManager*_?" The JobManager memory usage seems to be filling up quite dramatically up until it gets OOMKilled by Kubernetes. Regardless of how much memory I seem to give the Flink Process i.e 8GB, 16GB, 32GB the issue persists. I did have some luck artificially constraining the heap size for the Job along with some other memory settings which stopped it from being OOMKilled by k8s at least, but once the heap has been filled, our Source step, slows to a crawl and the whole job eventually exits with no record of it appearing in the Flink History Server. I am still quite new to Flink and JVM memory management/tuning as a whole, so forgive me if these are really naive JobManager memory settings:
    Copy code
    -Djobmanager.memory.process.size=32768mb
    -Djobmanager.memory.heap.size=24576mb
    -Djobmanager.memory.off-heap.size=7168mb
    -Djobmanager.memory.jvm-overhead.min=192mb
    -Djobmanager.memory.jvm-overhead.max=1024mb
  • n

    Nihar Rao

    02/13/2023, 4:34 PM
    Hello, I am trying to use ScheduledDropwizardReporter for reporting metrics from my flink app. I am getting the following error on line 133 of this file.
    Copy code
    2023-02-13 10:58:05,901 WARN  org.apache.flink.runtime.metrics.MetricRegistryImpl          [] - Error while registering metric: numRecordsOut.
    java.lang.ClassCastException: class org.apache.flink.dropwizard.metrics.FlinkCounterWrapper cannot be cast to class com.codahale.metrics.Metric (org.apache.flink.dropwizard.metrics.FlinkCounterWrapper is in unnamed module of loader 'app'; com.codahale.metrics.Metric is in unnamed module of loader org.apache.flink.core.plugin.PluginLoader$PluginClassLoader @5833f5cd)
    Due to this error, the metric is not reported. On looking into this, FlinkCounterWrapper does extend codahale.Counter. Can anyone please tell me how to fix this?
    m
    • 2
    • 3
  • n

    Nitin Agrawal

    02/14/2023, 4:46 AM
    Hello , From the query schema as well as Sink schema if the attribute
    record_id
    is removed then flink jobs loads. In case
    record_id
    is added even though the data type for query as well as sink matches it job load fails with the below error. record_id is created by UUID() method as part of flink SQL.
    Copy code
    Query schema: [resource: STRING NOT NULL, attribute: STRING NOT NULL, duration: STRING, duration_type: STRING NOT NULL, id: STRING, value: STRING NOT NULL, year: STRING, month: STRING, day: STRING, timestamp: TIMESTAMP_LTZ(3) NOT NULL, record_id: CHAR(36) NOT NULL, _c11: TIMESTAMP_LTZ(3) NOT NULL]
    Sink schema:  [resource: STRING, id: STRING, duration: STRING, duration_type: STRING, attribute: STRING, value: STRING, year: STRING, month: STRING, day: STRING, record_id: CHAR(36), timestamp: TIMESTAMP_LTZ(3), _c11: TIMESTAMP_LTZ(3)]
    	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
    	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
    	at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98)
    	at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:846)
    	at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:240)
    	at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1090)
    	at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1168)
    	at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
    	at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1168)
    Caused by: org.apache.flink.table.api.ValidationException: Column types of query result and sink for '*anonymous_datastream_sink$1*' do not match.
    Cause: Incompatible types for sink column 'record_id' at position 9.
    
    Query schema: [resource: STRING NOT NULL, attribute: STRING NOT NULL, duration: STRING, duration_type: STRING NOT NULL, id: STRING, value: STRING NOT NULL, year: STRING, month: STRING, day: STRING, timestamp: TIMESTAMP_LTZ(3) NOT NULL, record_id: CHAR(36) NOT NULL, _c11: TIMESTAMP_LTZ(3) NOT NULL]
    Sink schema:  [resource: STRING, id: STRING, duration: STRING, duration_type: STRING, attribute: STRING, value: STRING, year: STRING, month: STRING, day: STRING, record_id: CHAR(36), timestamp: TIMESTAMP_LTZ(3), _c11: TIMESTAMP_LTZ(3)]
    	at org.apache.flink.table.planner.connectors.DynamicSinkUtils.createSchemaMismatchException(DynamicSinkUtils.java:453)
    	at org.apache.flink.table.planner.connectors.DynamicSinkUtils.validateSchemaAndApplyImplicitCast(DynamicSinkUtils.java:265)
    	at org.apache.flink.table.planner.connectors.DynamicSinkUtils.convertSinkToRel(DynamicSinkUtils.java:208)
    	at org.apache.flink.table.planner.connectors.DynamicSinkUtils.convertExternalToRel(DynamicSinkUtils.java:151)
    	at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:276)
    	at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:194)
    	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
    	at scala.collection.Iterator.foreach(Iterator.scala:937)
    • 1
    • 1
  • s

    Suparn Lele

    02/14/2023, 5:41 AM
    Hi, I am trying to load data from postgresql tables through Table APIs. I am using BATCH as execution mode. I am trying to load data from postgresql table, aggregate it and store it back to sink. I have kept 8 as parallelism, 8 task slots and 1 task manager. When I try to load data I could see that 7/8 subtasks get finished almost instantaneously and 1/8 subtask runs for extended period of time to load data from the source. I am not understanding why is this happening. Could someone please help? Is there any configuration which I am missing? P.S I am using Flink 1.15
  • p

    Pooja Shehrawat

    02/14/2023, 7:18 AM
    Hi Team , We are getting
    RocksDBException: file is too short (1756 bytes) to be an sstable/opt/flink/rocksdb/job_
    Any help would be appreciated. Thanks.
    k
    • 2
    • 2
  • s

    Superskyyy

    02/14/2023, 8:42 AM
    Hi, I'm very new to Flink and I'm trying to implement a online machine learning algorithm (a tree based one written in Python) pipeline with pyflink-ml, but I can't seem to find a detailed quickstart on how exactly to turn my algorithm into a python-based model that works with flinkml apis. My model takes an unbounded stream of text and adjusts itself for each data point, while producing a prediction text. I have two core questions after going through the codebase and docs, any help is appreciated: 1. What is model data in the flink-ml library? I'm assuming it to be the trained model weights, am I right? 2. I can only see java-based models in the codebase while Python simply uses the wrapped java models, is it true that we can implement Python-based models? (sorry I know this is a dumb question) 3. If 2 is feasible, will Python model get somehow translated to Java code under the hood or they could become a bottleneck in terms of processing?
    d
    • 2
    • 3
  • a

    Adesh Dsilva

    02/14/2023, 10:45 AM
    How does
    jobmanager.execution.failover-strategy=region
    work? If I have two task pipeline A->B and I run it with parallelism of 5. Suppose 2 of the tasks fail (type B), will flink restart only those 2 tasks(of type B) or 4 tasks (both A & B) or all 10 tasks?
    w
    • 2
    • 2
  • p

    Phil Sheets

    02/14/2023, 1:38 PM
    Hello! We are using flink on AWS EKS, deployed with the kubernetes flink operator. Flink version 16, java 11, Flink table api. We have created a service account for the flink jobs that assumes an IAM with the permissions required to read from an s3 bucket. When we shell into the flink pod, install awscli, we are able to read the object. However, when the flink job runs, it gets a permission denied error. It works fine if the AWS tokens are exported as environment variables. Is there flink configuration that needs set to use the aws creds that are set by a service account?
    m
    • 2
    • 1
  • y

    Yaroslav Bezruchenko

    02/14/2023, 2:47 PM
    Hey, can someone suggest, why flink deployment on Flink Operator for Kubernetes 1.3.1 can try to create a separate cluster?
    Copy code
    Warning  ClusterDeploymentException  2m39s               JobManagerDeployment  Could not create Kubernetes cluster "pipeline".
  • ł

    Łukasz Maternia

    02/14/2023, 2:57 PM
    I'm using flink kubernetes operator 1.2 and flink 1.15. I have to mount persistent volume claim to taskamanger only (RWO mode) but when I add volume and volumeMounts to taskmanager.podTemplate there is some problem with creating taskmanager pod by flink operator. There is such error in the jobmanager logs:
    message=Forbidden: may not specify more than 1 volume type, reason=FieldValueForbidden, additionalProperties={}), StatusCause(field=spec.containers[0].volumeMounts[0].name, message=Not found: "local-persistent-volume"
    There are also otwer volumes at the common podTemplate and I'm wondering if operator is able to properly merge main podTemplate.spec.volumes with taskmanager.podTemplate.spec.volumes.
    • 1
    • 2
1...555657...98Latest