https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • r

    Ranjeet Ranjan

    06/07/2022, 1:02 PM
    Hi Guys, We have deployed Apache Flink on the Kubernetes but now I wanted to understand how we can implement autoscale?
    g
    m
    • 3
    • 14
  • j

    Jeesmon Jacob

    06/07/2022, 1:09 PM
    Question on flink-operator RBAC:
    Copy code
    - apiGroups:
      - ""
      resources:
      - pods
      - services
      - endpoints
      - persistentvolumeclaims
      - events
      - configmaps
      - secrets
      - nodes
      verbs:
      - '*'
    Where do we use
    nodes
    resource in operator? Is that permission really needed? Or can we reduce the verbs for
    nodes
    to just get/list/watch?
    m
    • 2
    • 4
  • g

    Gyula Fóra

    06/07/2022, 1:16 PM
    I feel that this channel is getting a bit out of hand, more with Feature requests etc than debugging/troubleshooting 😄 @Xintong Song what do you think?
    m
    j
    +3
    • 6
    • 17
  • v

    Veeramani Moorthy

    06/07/2022, 1:22 PM
    I just wrote my first small program in Flink. I am using 1.1.5.0 version of flink-core,flink-java,flink-streaming-java,flink-clients. Got the following error. "java.lang.IllegalStateException: No ExecutorFactory found to execute the application." What's wrong here?
    m
    • 2
    • 1
  • j

    Jeesmon Jacob

    06/07/2022, 7:32 PM
    Does flink operator support HA through leader election like most kubebuilder based operators have by default? I tried to scale up operator deployment and looks like all replicas are reconciling the CR. Is it true that we can run only one replica of operator pod?
    g
    y
    • 3
    • 22
  • x

    Xinbin Huang

    06/07/2022, 9:10 PM
    👋 I built a custom source using the source API in 1.13 to consume from a kafka-like event bus. The implementation is similar to pulsar which uses a one-thread-per-split model but doing all the deserialization within
    RecordEmitter
    . Do you have some tips on optimizing latency? • would moving the deserialization to
    SplitReader.fetch
    better? • p99 time from source received the events to emitting the event is roughly the polling timeout (similar to kafka/pulsar) set in
    SplitReader.fetch
    +1s. Is there some tips on further reduce the fixed portion (i.e. +1s)?
  • z

    Zain Haider Nemati

    06/08/2022, 12:20 PM
    Hi, We are working on importing flink metrics to cloudwatch using CWAgent and StatsD. We want to track the performance of job manager node and task manager nodes and status of each running job. Our production cluster is currently running in application mode using yarn as resource provider. There are a couple of questions we had These are the metrics we want to set up but needed clarity around whether it will start reporting once these are added to the conf file or do we have to restart our jobs and incase we have to restart how can we know job_id and task_id before running the jobs. Any support around this would be appreciated ! • JobManager: <host> • TaskManager: <host>, <tm_id> • Job: <job_id>, <job_name> • Task: <task_id>, <task_name>, <task_attempt_id>, <task_attempt_num>, <subtask_index>
    ✅ 1
    c
    m
    • 3
    • 6
  • j

    Jeesmon Jacob

    06/08/2022, 1:27 PM
    In the flink operator RBAC, I see this:
    Copy code
    - apiGroups:
          - flink-operator
        resources:
          - "*"
        verbs:
          - "*"
    https://github.com/apache/flink-kubernetes-operator/blob/main/helm/flink-kubernetes-operator/templates/rbac.yaml#L24-L29 But doesn't look like we have CRDs with that group. So wanted to check if it is carry over from an old version.
    ✅ 1
    m
    • 2
    • 8
  • e

    Eric

    06/09/2022, 12:29 AM
    Hello, I have a jar I'm trying to deploy in a container using the flink operator. The operator is great, and working well. But, I have property files in the resource directory of my jar. And my application can't see these files. I've read this is because of how classes are loaded. But, I haven't read a solution. How can I read the property files in the resource directory?
    🧵 1
    m
    g
    • 3
    • 3
  • e

    Eric

    06/09/2022, 12:31 AM
    It, of course, works fine running outside of flink
  • i

    Ildar Almakaev

    06/09/2022, 8:40 AM
    Hello, folk. I’ve got a few questions about data enrichment using Flink and its integration with Kafka/MSK on AWS. Let me share a little bit context for my questions… I used to implement a data enrichment app on Kafka Streams so that I joined a stream of hot data (clickstream) with a dimension table (e.g. users). To implement such app I joined a KStream (clickstream) which is stateless and immutable on KTable (users). Could you please share me what options and preferable API to implement such data enrichment app are? How could I store the latest state of user events?
    y
    • 2
    • 8
  • j

    John Gorman

    06/09/2022, 9:01 AM
    Hi Folks, I'm having some user metrics issues. I've setup Prometheus as a metrics reporter and am able to get the metrics from my task manager and job manager for all usual metrics. However, I cannot seem to get any user metrics that I have created (one counter and one meter). At debug I can see the metrics being added to the other metrics (I am not using a separate group in this case). I have not added any scope configuration (partially because I am not clear from the documentation on what the scopes actually do) but understood that the default scopes should be fine to at least see the metrics, but yet I don't see the counter or meter for the specific task manager being scraped. I see all the other operator specific metrics (noRecorsIn, noRecordsOut etc.) that were at the same operator level when I was debugging the addition of the operator based user metrics, but just don't see any custom user metrics. Am I right in assuming they should appear for the stock included prometheus reporter once it's configured in flink.yaml with no scopes configured? Thanks for the help.
    c
    • 2
    • 17
  • k

    Klaas Schmidt

    06/09/2022, 11:04 AM
    Hi. Is there anyone with experience in using Flink and .NET together? We've lots of code written in C# and need to decide whether to port to Java or, preferrably 😉, keep on using it.
    g
    • 2
    • 2
  • e

    Emily Morgan

    06/09/2022, 11:22 AM
    Hello! 🙂 I have an architecture question. I'm setting up a CDC pipeline that needs to handle database connectors (debezium kafka) to possibly tens of thousands of users' databases. I am wondering whether it makes to create a separate job for each user/database connector, or whether it should be one job (or a few jobs?) that dynamically creates connectors based on some input? Any feedback greatly appreciated!
    l
    • 2
    • 9
  • v

    Veeramani Moorthy

    06/09/2022, 1:00 PM
    Hi, I have created a small java program to experiment with Flink SQL & Table API. This program reads from table backed by CSV file & write that data into another table backed by parquet format. I am using flink-table-planner_2.12 as a maven dependency with the scope of 'provided'. Still I am getting error "java.lang.ClassNotFoundException: org.apache.flink.table.planner.delegation.ParserFactory" while excuting my program in local flink cluster. Any idea, what went wrong here?
    ✅ 1
    c
    m
    • 3
    • 30
  • h

    Hunter Medney

    06/09/2022, 4:08 PM
    Hi all, architectural / philosophical question - is Redis valuable for enrichment-type Flink jobs? Assume: • we have streams
    streamA
    and
    streamB
    • we have a Flink job that maintains a running aggregate of
    streamA
    called
    aggA
    , which reduces
    streamA
    to a set of key/value pairs. • we need to enrich
    streamB
    with the latest aggregate values in
    aggA
    • additional jobs will need to perform similar enrichments on streams using
    aggA
    key/values My initial thought would be to store the constantly-updating keyed data from
    aggA
    in Redis so that all the enrichment jobs can simply retrieve the latest aggregated value for
    aggA
    as it processes each record in streams like
    streamB
    . My high-level understanding of Redis is it propagates the latest key values in-memory for all Redis clients, so each Flink operation in each slot would always be working with the latest
    aggA
    values from Redis. Does this make sense? I haven't come across this kind of implementation yet associated with Flink, so I have a feeling I'm missing something 🙂
    o
    k
    +3
    • 6
    • 10
  • s

    Sahil Aulakh

    06/09/2022, 9:46 PM
    hi all, we are using Flink version 1.13.5 for our application and every time our job restarts, Flink job metrics are flattened after the restart. And we are seeing a Name Collision warning. Is this a known issue? and is there any workaround? please suggest.
    ✅ 1
    c
    • 2
    • 5
  • s

    Sucheth Shivakumar

    06/10/2022, 5:02 AM
    HI All, Avro custom Generic deserialization works fine with JAVA but falling back to Kryo deserializer in scala. flink version 1.15.0 .. can someone please point out what wrong im doing here ?
    Copy code
    Caused by: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
    Caused by: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
    
    Serialization trace:
    reserved (org.apache.avro.Schema$Field)
    fieldMap (org.apache.avro.Schema$RecordSchema)
    schema (org.apache.avro.generic.GenericData$Record)
    🧵 1
    m
    • 2
    • 1
  • s

    Sucheth Shivakumar

    06/10/2022, 5:02 AM
    Custom deserializer :
    Copy code
    class KafkaMessageDeserializationSchema(topic: String, schemaUrl: String) extends AbstractDeserializationSchema[GenericRecord] {
    
      @transient private var inner: KafkaAvroDeserializer = _
    
      override def deserialize(message: Array[Byte]): GenericRecord = {
        checkInitialized()
        inner.deserialize(topic, message).asInstanceOf[GenericRecord]
      }
    
      override def open(context: DeserializationSchema.InitializationContext): Unit = super.open(context)
    
      override def isEndOfStream(nextElement: GenericRecord): Boolean = {
        false
      }
    
      override def getProducedType: TypeInformation[GenericRecord] = TypeExtractor.getForClass(classOf[GenericRecord])
    
      private def checkInitialized(): Unit = {
        if (inner == null) {
          val props = Map(
            AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaUrl,
            KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG -> false
          )
          val client: SchemaRegistryClient = new CachedSchemaRegistryClient(schemaUrl, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT)
          inner = new KafkaAvroDeserializer(client, mapAsJavaMap(props))
        }
      }
  • s

    Sucheth Shivakumar

    06/10/2022, 5:02 AM
    Any idea why is this happening ? where has the same code in java works with no issues.
  • u

    Uday Sharma

    06/10/2022, 2:18 PM
    Hi All, I am trying to install flink using
    pip install apache-flink
    . Getting below errors related to pemja, JDK, VS.
    Copy code
    pyexceptions.c
          C:\Software\jdk-11.0.15\include\jni.h(45): fatal error C1083: Cannot open include file: 'jni_md.h': No such file or directory
          error: command 'C:\\Program Files (x86)\\Microsoft Visual Studio\\2017\\BuildTools\\VC\\Tools\\MSVC\\14.16.27023\\bin\\HostX86\\x64\\cl.exe' failed with exit status 2
          [end of output]
    
      note: This error originates from a subprocess, and is likely not a problem with pip.
    error: legacy-install-failure
    
    × Encountered error while trying to install package.
    ╰─> pemja
    
    note: This is an issue with the package mentioned above, not pip.
    hint: See above for output from the failure.
    m
    c
    j
    • 4
    • 11
  • i

    Ildar Almakaev

    06/10/2022, 8:57 PM
    Happy Friday) Could you please help me find out and give me some tips to resolve the issue with state store and data enrichment Basically, I have 2 data streams created from Kafka topics:
    DataStream<User> userStream
    and
    DataStream<Log> logsStream.
    Both streams are repartitioned by the
    userId
    column using
    keyBy()
    . Finally I just need to enrich log events with users data. AFAIK, I could do that using
    connect
    and
    process
    :
    Copy code
    DataStream<EnrichedLogs> enrichedStream = clickStream
            .connect(usersStream)
            .process(new ProcessingTimeJoin());
    The
    ProcessingTimeJoin
    is the
    CoProcessFunction<Log, User, EnrichedLogs>
    with a state
    ValueState<User> userState
    Please take a look at the code snippet:
    Copy code
    public class ProcessingTimeJoin extends CoProcessFunction<Log, User, EnrichedLog> {
    
        // Store latest reference data
        private ValueState<User> userState;
    
        @Override
        public void open(Configuration config) {
            ValueStateDescriptor<User> cDescriptor = new ValueStateDescriptor<>("data", TypeInformation.of(User.class));
            userState = getRuntimeContext().getState(cDescriptor);
        }
    
        @Override
        public void processElement1(Log logEvent,
                                    Context context,
                                    Collector<EnrichedLog> out) throws Exception {
            User user = userState.value();
            if (Objects.nonNull(user)) {
                EnrichedLog enrichedLog = Mapper.mapToEnrichedLog(logEvent, user);
                out.collect(enrichedLog);
            } else {
                log.warn("Can not enrich {}", logAudit);
            }
        }
    
        @Override
        public void processElement2(User user, Context context, Collector<EnrichedLog> collector) throws Exception {
            userState.update(user);
        }
    }
    Question :) AFAICS, there might be cases in
    processElement1
    function when the state store might not have such user data in the store. Is there any way to fill the state before the enrichment (joining) the streams? I mean, consume all available users data from
    users
    Kafka topic, prepare the state with users data and only after that start consuming events of logs and enriching them?
    k
    • 2
    • 7
  • s

    Sucheth Shivakumar

    06/10/2022, 9:34 PM
    Hi All, Is there a way i can read avro serialized genericrecord from kafka and sink it to kafka topic with the same schema without having to store the schema in between ?
    Copy code
    I see ConfluentRegistryAvroSerializationSchema.forGeneric() expects schema to be provided, I do not want to store schema I just want to make use of the same schema that have read from the source kafka topic which is already present as part of the genericrecord
  • v

    Veeramani Moorthy

    06/11/2022, 5:35 AM
    In IntelliJ IDE, I am not able to get code completion feature for flink dependencies. I have created a maven project by choosing java as language. What's wrong here?
    • 1
    • 1
  • s

    Sucheth Shivakumar

    06/11/2022, 6:09 AM
    In Intellij Idea, avro generic type Serialization works just fine. but when I submit the job to flink-cluster it falls back to kryo. May I know why does it work from IDE and fail when i submit it to flink cluster ? also any possible solution to fix this ?
    j
    • 2
    • 4
  • s

    Sucheth Shivakumar

    06/11/2022, 4:58 PM
    Hi All,
    Copy code
    How to implement flink kafka streaming with avro serialized topic with TopicRecordNameStrategy(same topic can contain multiple event types)
    
    I have to implement flink kafka streaming where kafka topic can have multiple event types with TopicRecordNameStrategy I want to setup a generic record streaming, since single topic can have multiple schema how to implement this case in flink streaming ?
    • 1
    • 1
  • b

    Bharath

    06/12/2022, 1:01 AM
    Hi All, I am using pyflink to stream data from Kafka to s3. So far messages in Kafka are in Avro. We are planning to migrate to protobuf which is not natively supported file format in flink. Any suggestions how to handle protobuf format source in pyflink ?
    s
    • 2
    • 2
  • j

    Jaya Ananthram

    06/12/2022, 4:01 PM
    Hello 👋, I have a question about the Flink Kubernetes operator. Is there any minimum requirement for the Kubernetes version to use this operator? More context in the 🧵
    ✅ 1
    m
    c
    • 3
    • 7
  • j

    Jeff Levesque

    06/13/2022, 1:20 AM
    Trying to integrate an example SQL to take a
    sliding_window.py
    (i.e. previous example) to the next level. I successfully tested the following modification (which also required some adjustments to
    create_print_table()
    ):
    Copy code
    input_table = table_env.from_path(input_table_name)
        sliding_window_table = (
            input_table.window(
                Slide.over(sliding_window_over)
                .every(sliding_window_every)
                .on(sliding_window_on)
                .alias(sliding_window_alias)
            )
            .group_by('ticker, {}'.format(sliding_window_alias))
            .select('ticker, MIN(price) as min_price, MAX(price) as max_price, {0}.start as utc_start, {0}.end as utc_end'.format(
                sliding_window_alias
            ))
        )
    Since the above worked, I tried to slightly modify as follows:
    Copy code
    input_table = table_env.from_path(input_table_name)
        sliding_window_table = (
            input_table.window(
                Slide.over(sliding_window_over)
                .every(sliding_window_every)
                .on(sliding_window_on)
                .alias(sliding_window_alias)
            )
            .group_by('ticker, {}'.format(sliding_window_alias))
            .select('FROM_UNIXTIME(28*60 * (UNIX_TIMESTAMP({0}.end) / (28*60))), ticker, MIN(price) as min_price, MAX(price) as max_price, {0}.start as utc_start, {0}.end as utc_end'.format(
                sliding_window_alias
            ))
        )
    However, when I run the above, I get the following error:
    Copy code
    raise Py4JJavaError(
    py4j.protocol.Py4JJavaError: An error occurred while calling o75.select.
    : org.apache.flink.table.api.ValidationException: Undefined function: FROM_UNIXTIME
    	at org.apache.flink.table.expressions.resolver.LookupCallResolver.lambda$visit$0(LookupCallResolver.java:53)
    	at java.base/java.util.Optional.orElseThrow(Optional.java:408)
    	at org.apache.flink.table.expressions.resolver.LookupCallResolver.visit(LookupCallResolver.java:49)
    	at org.apache.flink.table.expressions.resolver.LookupCallResolver.visit(LookupCallResolver.java:36)
    	at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:35)
    	at org.apache.flink.table.expressions.LookupCallExpression.accept(LookupCallExpression.java:66)
    	at org.apache.flink.table.api.internal.TableImpl.lambda$preprocessExpressions$0(TableImpl.java:605)
    	at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
    	at java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
    	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
    	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
    	at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
    	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    	at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
    	at org.apache.flink.table.api.internal.TableImpl.preprocessExpressions(TableImpl.java:606)
    	at org.apache.flink.table.api.internal.TableImpl.access$300(TableImpl.java:66)
    	at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:775)
    	at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:770)
    	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    	at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    	at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    	at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
    	at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    	at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
    	at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
    	at java.base/java.lang.Thread.run(Thread.java:829)
    d
    • 2
    • 5
  • u

    Uday Sharma

    06/13/2022, 8:11 AM
    Hi All, I am using flink java api. But facing issue while creating a connector for Kafka(streaming csv as String) since there are no references available. I am trying to read csv data from Kafka(as String) and loading the same in Snowflake. Can someone please support on this.
    s
    • 2
    • 3
12345...98Latest