https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • a

    Antonio Manuel

    07/21/2022, 4:40 PM
    Hi everyone, I am interested in learning Apache Flink, and I have some doubts. What do companies use in production environment for running applications Flink? Kubernetes, AWS EMR, or ….? Where could I look for a material to learn Flink? Any advice?
    h
    a
    • 3
    • 2
  • e

    Emily Morgan

    07/22/2022, 7:52 AM
    Hey 🙂 Is there a way to set a “enabled mechanism” for SASL protocol between the Flink cluster and Kafka? I keep running into this error:
    Copy code
    java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.IllegalSaslStateException: Unexpected handshake request with client mechanism SCRAM-SHA-256, enabled mechanisms are []
    which is preventing me from reading from my Kafka source topic. The Kafka instance requires SASL_PLAINTEXT protocol with SCRAM-SHA-256 mechanism. When I run Flink locally (and only Kafka is running in a docker container) I do not get this error, however when I run both services inside respective docker containers it occurs. Flink and Kafka services are running on the same network as they are in the same docker-compose file. This is what the Flink docker-compose setup looks like:
    Copy code
    services:
      jobmanager:
        image: flink:1.15-scala_2.12
        expose:
          - "6123"
        ports:
          - "8081:8081"
        command: jobmanager
        environment:
          - |
            FLINK_PROPERTIES=
            jobmanager.rpc.address: jobmanager
      taskmanager:
        image: flink:1.15-scala_2.12
        depends_on:
          - jobmanager
        command: taskmanager
        scale: 1
        environment:
          - |
            FLINK_PROPERTIES=
            jobmanager.rpc.address: jobmanager
            taskmanager.numberOfTaskSlots: 1
            parallelism.default: 1
    ...
    ✅ 1
    s
    j
    • 3
    • 5
  • j

    Jaya Ananthram

    07/22/2022, 8:22 AM
    Have anyone tried to use Flink MSK IAM authentication with the table API with MSK source? I am getting some exceptions with class loader (🧵). I am building a shaded jar for my flink job with
    aws-msk-iam-auth
    . As per this, I think I should not add
    aws-msk-iam-auth
    in the plugin because Kafka comes in my shaded jar. Any idea?
    d
    • 2
    • 7
  • j

    Jeesmon Jacob

    07/22/2022, 2:06 PM
    Hi there, is there any potential issue in submitting a job directly using
    flink
    cli to a cluster created through
    FlinkDeployment
    ? Trying to see if there will be any conflicts in kubernetes operator when it sees a job not managed by the operator. Also, in this scenario, is there a plan (in future) to sync unmanaged job to a
    FlinkSessionJob
    so that it can be managed by operator? I have seen similar approach in Strimzi Kafka operator for a two way sync for Topics. So trying to see if anyone thought about it 🙂
    g
    • 2
    • 15
  • y

    Yahor Paulikau

    07/22/2022, 6:20 PM
    Hi folks. After recent upgrade from 1.12.7 to 1.15.1 we’re struggling to read Avro topic from Kafka (not confluent) using maven avro plug-in generated class and KafkaSource. All examples I have seen are using confluent registry deserializer. Will appreciate any help or working examples - please see the thread for details.
    c
    s
    • 3
    • 15
  • s

    Sucheth Shivakumar

    07/23/2022, 5:20 AM
    Running into Caused by: java.lang.IllegalStateException: Too many schema objects created for topic while serializing avro GenericRecord in flink. I'm using custom serializer as below,
    Copy code
    public class AvroDataSerializationSchema implements SerializationSchema<GenericRecord> {
        private final String registryUrl;
        private final String topic;
        private transient KafkaAvroSerializer kafkaAvroSerializerClient = null;
    
        public AvroDataSerializationSchema(String registryUrl, String topic) {
            this.registryUrl = registryUrl;
            this.topic = topic;
        }
    
       
        @Override
        public void open(InitializationContext context) throws Exception {
            SerializationSchema.super.open(context);
        }
    
       
        @Override
        public byte[] serialize(GenericRecord element) {
            checkInitialized();
            return kafkaAvroSerializerClient.serialize(topic, element);
        }
    
        private void checkInitialized() {
            if (kafkaAvroSerializerClient == null) {
                Map<String, Object> props = new HashMap<>();
                props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
                props.put("value.subject.name.strategy", "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy");
                // props.put("schema.compatibility.level", "forward_transitive");
                //   props.put("auto.register.schemas", false);
    
                SchemaRegistryClient client = new CachedSchemaRegistryClient(
                        registryUrl,
                        AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
                kafkaAvroSerializerClient = new KafkaAvroSerializer(client, props);
            }
        }
    Problem is schema in the GenericRecord has new reference for every generic record, and in the CachedSchemaRegistryClient they are storing schema reference in the map as a key and schema id as a value. with the new instance of the schema in every GenericRecord it is overflowing the map size of 1000. Below is the code from CachedSchemaRegistryClient
    Copy code
    public synchronized int register(String subject, Schema schema, int version, int id) throws IOException, RestClientException {
            Map<Schema, Integer> schemaIdMap = (Map)this.schemaCache.computeIfAbsent(subject, (k) -> {
                return new HashMap();
            });
            Integer cachedId = (Integer)schemaIdMap.get(schema);
            if (cachedId != null) {
                if (id >= 0 && id != cachedId) {
                    throw new IllegalStateException("Schema already registered with id " + cachedId + " instead of input id " + id);
                } else {
                    return cachedId;
                }
            } else if (schemaIdMap.size() >= this.identityMapCapacity) {
                throw new IllegalStateException("Too many schema objects created for " + subject + "!");
            } else {
                int retrievedId = id >= 0 ? this.registerAndGetId(subject, schema, version, id) : this.registerAndGetId(subject, schema);
                schemaIdMap.put(schema, retrievedId);
                ((Map)this.idCache.get((Object)null)).put(retrievedId, schema);
                return retrievedId;
            }
        }
    Anyone encountered this before ? Can someone please point put what am i doing wrong here ?
    c
    s
    • 3
    • 7
  • a

    Ali Zia

    07/23/2022, 3:13 PM
    Hi folks, running into an issue with UDTFs on PyFlink using 1.14 and 1.15. Can you please advise? Thanks!
    Copy code
    Exception:
    
    pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Functions implemented for the old type system are not supported.
    Copy code
    from pyflink.table import TableEnvironment, EnvironmentSettings, CsvTableSource
    from pyflink.table.expressions import col
    from pyflink.table.types import DataTypes
    from pyflink.table.udf import udf
    
    
    kinesis_source_ddl = """
        CREATE TABLE source1 (
          a STRING,
          b STRING,
          datetime TIMESTAMP(3),
          test ARRAY<STRING>
        )
        WITH (
          'connector' = 'kinesis',
          'stream' = 'test',
          'aws.region' = 'us-west-2',
          'scan.stream.initpos' = 'LATEST',
          'format' = 'json',
          'json.timestamp-format.standard' = 'ISO-8601'
        )
    """
    
    @udf(result_type=DataTypes.FLOAT())
    def semantic_analysis():
        return [0, 0, 0]
    
    t_env.execute_sql(kinesis_source_ddl)
    t_env.from_path("source1").select(
        col("a"),
        col("b"),
        col("datetime"),
        col("test"),
        semantic_analysis(col("a")).alias("semantic_scores")
    ).execute()
    d
    • 2
    • 2
  • j

    Jaya Ananthram

    07/24/2022, 11:41 AM
    Hello, I am facing a strange random issue with the table API (Kafka as source and S3 parquet sink, technically MSK with SASL). Most of the time the source is not reading any message after fresh deployment (ie - I am deleting the clsuter and doing a fresh deployment with a new consumer group id) from Kafka. But randomly it works with the same code. I tried to enable DEBUG log for the Flink Kafka connector (org.apache.flink.connector) and Kafka (org.apache) and I don't find any suspect apart from the following logs. The metrics show zero bytes received and no traces for any message consumed by flink most of the time. Any idea?
    Copy code
    org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
    Caused by: org.apache.kafka.common.errors.DisconnectException
    s
    • 2
    • 23
  • j

    Jeff Levesque

    07/25/2022, 12:50 AM
    Does (Py)Flink have in-memory sink? I know Spark has it's structure streaming, which has
    MemorySink
    [1] and DataBricks has
    memory
    tables [2]. Can I do something similar in (Py)Flink, or should I sink to AWS S3, or AWS Kinesis? --- [1] https://jaceklaskowski.gitbooks.io/spark-structured-streaming/content/spark-sql-streaming-MemorySink.html [2] https://docs.databricks.com/getting-started/spark/streaming.html#start-the-streaming-job
    m
    • 2
    • 3
  • s

    Sucheth Shivakumar

    07/25/2022, 2:57 AM
    Hi All, Can someone please help me with setting env variable and accessing it in the task-managers ?
    Copy code
    env.java.opts: -DDATA_STREAM_TENANT=local
    i can set this in flink-conf.yml and access it in main class where im setting ExecutionEnvironment. but System.getProperty("DATA_STREAM_TENANT") returns null else where. can someone please help me on how to do it. ?
    l
    a
    j
    • 4
    • 22
  • c

    chunilal kukreja

    07/25/2022, 7:34 AM
    hi Team, is this ticket resolved https://issues.apache.org/jira/browse/FLINK-7286? As i still see ‘0’ as output for kafkasource & kakfka sink (latest flink release 1.15) for
    numRecordsOutPerSecond
    m
    • 2
    • 3
  • c

    chunilal kukreja

    07/25/2022, 9:11 AM
    hi @Martijn Visser/team, I need to get below mentioned benchmark for my flink job i.e; b) record event time vs record ingestion time vs record out time via sink my dataflow is kafka source + keyby/process() + kafka sink As current metrics seems to be restricted around numRecordsin & numRecordsOut, i need get actual timestamps. Can you guys suggest some way to achieve this?
    m
    c
    • 3
    • 8
  • d

    Dieter

    07/25/2022, 12:01 PM
    I have an question about eventtime and processing time. I get time series data from hardware sensors. Those sensor values are timestamped but out of some reasons the way to the Flink system can be delayed for a different amount for different senors. To simulate this I'm using Kafka and send on two topics sensor values. The delay is simulated by adjusting the eventtime when sending the signal via kafka: val msg = "$topic - eventtime: ${format(eventTime)} realtime: ${format(now)}" val record = ProducerRecord<String, String>(topic, null, eventTime, null, msg) In Flink I'm using val source = KafkaSource.builder<String>() .setBootstrapServers("localhost:9092") .setTopics(producers.map { it.topic() }) .setGroupId("flink-kafka-test") .setStartingOffsets(OffsetsInitializer.latest()) .setValueOnlyDeserializer(SimpleStringSchema()) .build() to setup a source and the following to create some output on stdout: val kafkaStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source") val pf : ProcessFunction<String,String> = object : ProcessFunction<String,String>() { val sdf = SimpleDateFormat("HHmmss.SSSZ") override fun processElement(value: String, ctx: Context, out: Collector<String>) { println(" pf: ${sdf.format(Date(ctx.timestamp()))} msg: $value") } } kafkaStream.process(pf) Here is some example output: pf: 131936.248+0200 msg: test2 - eventtime: 131936.248+0200 realtime: 131936.848+0200 pf: 131936.875+0200 msg: test1 - eventtime: 131936.875+0200 realtime: 131936.875+0200 pf: 131937.249+0200 msg: test2 - eventtime: 131937.249+0200 realtime: 131937.849+0200 pf: 131938.375+0200 msg: test1 - eventtime: 131938.375+0200 realtime: 131938.375+0200 pf: 131938.249+0200 msg: test2 - eventtime: 131938.249+0200 realtime: 131938.849+0200 We can see that the eventtime of the lastline lies before the previous line. I thought that using Watermarks should help here, so I tried the the following: val kafkaStream = env.fromSource(source, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2)), "Kafka Source") But this does not order the messages based on the eventtime. Is it possible to let flink sort the messages based on the eventtime? How would I do that?
    c
    d
    • 3
    • 4
  • j

    Jin Yi

    07/25/2022, 2:48 PM
    is there a protobuf to rowdata converter? do i need to write one for each of my protos (or rather, do it generically at a GenericMessageV3 level)?
    c
    • 2
    • 11
  • d

    Dylan Meissner

    07/25/2022, 3:08 PM
    Installing Flink operator via Helm in Quickstart says to
    Copy code
    helm repo add flink-operator-repo <https://downloads.apache.org/flink/flink-kubernetes-operator-1.1.0/>
    helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
    Which works. But in Operations > Helm it invites us to
    Copy code
    helm install flink-kubernetes-operator helm/flink-kubernetes-operator
    Which fails because the repo doesn’t exist. Is there a plan for the second method to work?
    m
    g
    • 3
    • 14
  • s

    Sucheth Shivakumar

    07/25/2022, 4:29 PM
    Hi All, I have a job from kafka to kafka with at least once delivery guarantee. However if there is any exception in the job, flink keeps pushing data repeatedly to the kafka sink. Can someone please help me on understanding this behaviour and how to handle it ?
  • g

    Giannis Polyzos

    07/25/2022, 4:40 PM
    https://github.com/apache/flink-kubernetes-operator/blob/main/examples/basic-session-deployment-only.yaml ^ checking the examples here for a session cluster, i would expect to create taskmanagers as well and have the available slots displayed - but seems like this is not the case? What would be the best approach to create a session cluster with predefined resources so that users can submit jar files from the Flink UI or the cli (as a use case requires it)?
    m
    • 2
    • 17
  • e

    Erik Wickstrom

    07/25/2022, 9:54 PM
    I’m trying to get to the bottom of a backpressure issue in a new flink application I’m testing. I’m using the flink k8s operator to deploy my job. I have a series of events that are coming from a Kinesis source (table
    A
    ), and I’m performing a
    LEFT JOIN
    against a kafka source (table
    B
    ) that contains a few hundred GB of slowly changing state (my intention is for that state to get cached into flink’s rocksdb backend.). I’m using a Versioned Table View to accomplish this. The task reading in the large state for table
    B
    from Kafka is 100% backpressured, and the task populating the Versioned Table View is Busy at 100% (Using the
    ROW_NUMBER()
    SQL window function pattern described in the docs linked above. I tried updating my
    FlinkDeployment.parallelism: 2
    from
    1
    — but the Flink UI is still reporting “Parallelism: 1” for all tasks. Am I dialing the right knob to fix this issue? Or should I be looking at something else?
    g
    • 2
    • 2
  • e

    Echo Lee

    07/26/2022, 12:48 AM
    Hello, does the community plan to support side output of Flink SQL? For example, scenarios for processing delayed data in windowed aggregation.
    m
    • 2
    • 2
  • r

    rohit jangid

    07/26/2022, 1:56 AM
    Hi, I am new to Flink and have a use case to consume events from Kafka to Flink App and then store the processed events in S3 with Server Side Encryption using CMK. I was hoping this to be a common usecase but couldn't find any resources on this. Has anybody tried configuring this ? Thanks
    m
    • 2
    • 1
  • c

    chunilal kukreja

    07/26/2022, 6:42 AM
    hi team, how can i get ingestion time when pulling in records using kafka source?
    • 1
    • 1
  • a

    Ali AIT-BACHIR

    07/26/2022, 12:33 PM
    Hi everyone, I found this text that recommends how to optimse backpressure in Flink : "This simple flow of buffers between fixed-sized pools enables Flink to have a robust backpressure mechanism, where tasks never produce data faster than can be consumed." Does anyone know what are the exact config names of these partameters (number of buffers and size of a buffer per task) ? By the way, we are using the version 1.13.6 of Flink. Kind regards, Ali
    g
    c
    • 3
    • 9
  • j

    Jeesmon Jacob

    07/26/2022, 1:01 PM
    Hi there, any tips on updating existing flink-kubernetes-operator CRDs (deployed through helm chart) from one version to another in a fully automated way? As I understood helm will not upgrade crds from new chart if it already exists.
    Copy code
    Some caveats (and explanations)
    
    There is no support at this time for upgrading or deleting CRDs using Helm. This was an explicit decision after much community discussion due to the danger for unintentional data loss. Furthermore, there is currently no community consensus around how to handle CRDs and their lifecycle. As this evolves, Helm will add support for those use cases.
    https://helm.sh/docs/chart_best_practices/custom_resource_definitions/ Interested to see how others are tackling this issue for automation.
    g
    • 2
    • 17
  • a

    Ali AIT-BACHIR

    07/26/2022, 3:13 PM
    Hi, In sliding windows there may be overlapping windows. One point may be assigned to multiple windows. Is this point duplicated in memory or not (by using a reference to that object) ? Kind regards,
    d
    • 2
    • 4
  • c

    chunilal kukreja

    07/26/2022, 4:49 PM
    Hi Team, To get the record out time i.e. when record is pushed out of my kafkasink, do i need to implement custom RichSinkFunction<>? or is there some other way i can get that time?
    g
    d
    • 3
    • 5
  • r

    Roman Bohdan

    07/26/2022, 10:41 PM
    Hello, guys. Sorry if late. I want to ask you regarding savepoints. I want to save state after redeploy using savepoints. I have already got acquainted with your documentation, but still need help. 🥺 How can i configure flink to get last savepoint state on taskmanager start? Can you please give me quick guide?
    s
    d
    • 3
    • 8
  • e

    Erik Wickstrom

    07/27/2022, 12:36 AM
    When using the Flink Kubernetes Operator, how are people attaching volumes to their pods to store state? I successfully launched 1 task manager with a PVC pointing to an EBS volume (I’m running EKS on AWS). However, if I increase my parallelism, the taskmanager pods get stuck in the Init phase because they are getting scheduled on different machines than the EBS PVC. Normally, I’d used a k8s StatefulSet for a case like this. But the
    FlinkDeployment.spec.podTemplate
    indicates that the operator wants to manage the pod directly.
    g
    y
    +4
    • 7
    • 25
  • s

    salvalcantara

    07/27/2022, 8:38 AM
    Hello! I'm considering to use Flink SQL for implementing an alerting system. Alerts are of the form
    cpu.utilization > 75% and mem.utilization > 75%
    , which can be easily translated into SQL. However, I'm not totally convinced that alerting is a good use case for Flink SQL. My concern is more on the deployment/management side, namely: • Users are expected to define their own alerts, so at the end of the day we will have one sql job per alert, which might be very expensive and/or difficult to manage... In general, can anyone point me to existing alerting applications built on top of Flink? The best I could find for now is this blog post: https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html, which is based on DataStream API instead, maybe confirming my Flink SQL unsuitability hypothesis...
    ✅ 1
    h
    • 2
    • 11
  • p

    Parthiban PR

    07/27/2022, 8:46 AM
    Can anyone help me resolve the below error,
    Copy code
    Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
    I get this error when deploying my project JAR into flink’s machine. Flink version: 1.14.0 Kafka version: kafka_2.12-3.1.1
    ✅ 1
    m
    • 2
    • 21
  • h

    Hunter

    07/27/2022, 9:04 AM
    Hi,guys,When using flink14.x to write data to Kafka, the written topic does not exist in Kafka, flink will automatically create it, and then write data to the automatically created topic successfully, but it cannot be seen in the data directory of Kafka The corresponding data file, and I can read the data normally with flink connect kafka source, it is also normal with Kafka api, I don't know why this happens? Can you give me some troubleshooting advice? Or tell me the location of the code that automatically creates the kafka topic, I want to take a look,Thanks
    ✅ 1
    c
    • 2
    • 2
1...8910...98Latest