https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • t

    Tal Sheldon

    03/22/2023, 10:47 AM
    I currently have a (plain) Kafka consumer that consumes from a dynamic list of topics (every minute the list is being recalculated, usually it's the same list). Can I do the same with Flink? And how? (Flink to consume from a dynamic list of kafka topics). Every minute there's some method that gets all the topics by some logic (topics can be added, and removed in this logic, but usually it's the same).
    m
    • 2
    • 3
  • j

    Jalil Alchy

    03/22/2023, 12:59 PM
    Hey Everyone, I have a problem that feels off, but maybe I am doing this wrong. I want to create a KafkaSink, so I have a class that has a method:
    Copy code
    public KafkaSink<OutboxRecord> getKafkaSink() {
            KafkaRecordSerializationSchema<OutboxRecord> serializer = KafkaRecordSerializationSchema.<OutboxRecord>builder()
                    .setTopicSelector(x -> x.topic)
                    .setValueSerializationSchema(new KafkaOutboxRecordSerializer())
                    .build();
    
            Properties p = new Properties() {
                {
                    put("<http://transaction.timeout.ms|transaction.timeout.ms>", (Integer) 60000);
                }
            };
    
            return KafkaSink.<OutboxRecord>builder()
                    .setBootstrapServers("localhost:9092")
                    .setKafkaProducerConfig(p)
                    .setRecordSerializer(serializer)
                    .build();
        }
    However this method causes the application to throw a not serializable error. If I make the method static, it gets better. Is there a better way to do this that I am missing?
  • a

    Amir Hossein Sharifzadeh

    03/22/2023, 5:17 PM
    Hello folks. I need help to create a join on two streams but am not sure what’s the best way to do that. I have two corresponding tables
    EMPAD_BKGD_TBL
    and
    EMPAD_BKGD_TBL
    where each table has equal rows (64). Both tables have chunk_i field with uniques values (1..64). I am trying to create join on both tables (stream) and I would expect that my joined_query will give me 64 rows but I see duplicated rows there.
    Copy code
    String data_query = "select EMPAD_RAW_TBL.chunk_i as chunk_i, EMPAD_RAW_TBL.data as raw_enc_data, EMPAD_RAW_TBL.n_total_chunks as n_total_chunks, " +
            "EMPAD_BKGD_TBL.data as bkgd_enc_data FROM EMPAD_RAW_TBL join EMPAD_BKGD_TBL on EMPAD_RAW_TBL.chunk_i = EMPAD_BKGD_TBL.chunk_i";
    Table raw_table =
            tableEnv.sqlQuery(raw_query);
    DataStream<Row> raw_stream = tableEnv.toDataStream(raw_table);
    raw_table
    contains 128 rows but I expect to have 64 rows. I don’t know how to fix the issue here. Thanks you.
  • h

    Herat Acharya

    03/22/2023, 11:38 PM
    We are deploying in kubernetes natively using
    kubernetes-session.sh
    and specifying
    taskmanager.numberOfTaskSlots=8
    these denote task slots per task manager right?? So how does flink know how many task managers to create ? Btw our source is kafka and sink is a database... kafka will constantly have events
  • l

    Lee xu

    03/23/2023, 1:07 AM
    Hello, how does the memory of python udf function manage? How to configure? I am having this problem now. You can look at the uploaded log file. Can you tell from the log file why python server failed?
    error-log.txt
    d
    • 2
    • 4
  • c

    Chen-Che Huang

    03/23/2023, 2:02 AM
    Hello. I have a question about the restore mode of Flink. Flink 1.15 starts to provide three restore modes:
    CLAIM
    ,
    NO_CLAIM
    , and
    LEGACY
    . Assume that my Flink application restores from a savepoint
    SVP-1
    with restore mode
    CLAIM
    . As time goes by, my Flink application creates new savepoints
    SVP-2
    ,
    SVP-3
    , and so on. From the doc, the
    CLAIM
    mode may delete
    SVP-1
    when Flink thinks
    SVP-1
    is not needed for recovery anymore. How about
    SVP-2
    ,
    SVP-3
    and future savepoints? Will them also be deleted if Flink thinks them no longer required? Thanks in advance for any reply.
    m
    • 2
    • 2
  • h

    Hu Guang

    03/23/2023, 3:48 AM
    Hello, I have a question about Flink Sliding Window and aggregation function. We have a job which calculate max/min/sum in the last 24 hours(event time) for each id in a given stream. Our approach is using sliding event window in a keyed stream. But as the data grows we found that sliding window consumed a lot of disk space because the states is so large. So we add aggregate functions to the job. But after adding the agg function, we found that the watermark generation seems stops and the job seems stopped. The code is something like this
    Copy code
    keyedStream.window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(5)))
               .aggregate(new Last24HAggregator(), new MyProcessWindowFunction())
               .addSink(new MySink());
    Before we add the aggregation function, everything seems fine. My guess is that maybe aggregation function is not compatible with event time processing semantics.
  • a

    Ashutosh Joshi

    03/23/2023, 10:38 AM
    Hi, is there any way to pass entire row as a parameter to user defined function (either through scalar or table function). I do not want to pass only selected fields. To make it generic I need all the fields. below are the steps that I have followed but no luck - 1 - create a custom function
    Copy code
    public class SelectAllFields extends TableFunction<Row> {
        public void eval(Row row) {
            collect(row);
        }
    
    }
    2 - register it -
    Copy code
    env.createTemporarySystemFunction("SelectAllFields", SelectAllFields.class);
    3 - using through sql -
    Copy code
    select name, SelectAllFields(*) as meta from table1
  • k

    kingsathurthi

    03/23/2023, 10:47 AM
    the flink-kubernetes-operator container has below vulnerabilities how to fix this
    g
    • 2
    • 3
  • t

    Tiansu Yu

    03/23/2023, 12:37 PM
    We have been constantly facing an issue that brings Flink down, that is, upstream bad data breaks the CustomDeserializationSchema step, and then the whole pipeline went down. Is there a good way that can prevent this from happening, and only log / report metric on such cases?
  • t

    Tsering

    03/23/2023, 1:47 PM
    hi Good morning, afternoon and evening, i have a question on
    EventTimeTrigger
    from flink
    Copy code
    @Override
        public TriggerResult onElement(
                Object element, long timestamp, TimeWindow window, TriggerContext ctx)
                throws Exception {
            if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
                // if the watermark is already past the window fire immediately
                return TriggerResult.FIRE;
            } else {
                ctx.registerEventTimeTimer(window.maxTimestamp());
                return TriggerResult.CONTINUE;
            }
        }
    suppose there are 5 elements of a same window got into
    onElement
    function and hit the
    registerEventTimeTimer
    five times, will the
    onEventTime
    get invoke 5 times ?
    e
    • 2
    • 2
  • t

    Trevor Burke

    03/23/2023, 6:56 PM
    Hello! Trying to figure out what parameters to tweak next... I have a Flink application that reads Protobuf messages from Kafka topic then these events are persisted to S3. We are running Flink via the Lyft Flink Kubernetes Operator. I have parallelism set to 10 compared to 1 for our staging env. Of course, our staging env sees far fewer events than prod. I'm getting
    Checkpoint expired before completing
    • 1
    • 2
  • a

    Amir Hossein Sharifzadeh

    03/23/2023, 7:56 PM
    Quick question: Does Flink support “`DISTINCT`” in query? When I run my query with “`DISTINCT`”:
    Copy code
    String raw_query = "select DISTINCT chunk_i FROM EMPAD_RAW_TBL"; Table join_table =
            tableEnv.sqlQuery(raw_query);
    
    DataStream<Row> bkgd_stream = tableEnv.toDataStream(join_table);
    I get this error:
    org.apache.flink.table.api.TableException: Table sink '*anonymous_datastream_sink$1*' doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[chunk_i], select=[chunk_i])
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:405)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:185)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:366)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:355)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:354)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
    at scala.collection.immutable.Range.foreach(Range.scala:155)
    at scala.collection.TraversableLike.map(TraversableLike.scala:233)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:354)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:128)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.optimize(FlinkChangelogModeInferenceProgram.scala:53)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.optimize(FlinkChangelogModeInferenceProgram.scala:43)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:59)
    at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
    at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
    at scala.collection.Iterator.foreach(Iterator.scala:937)
    at scala.collection.Iterator.foreach$(Iterator.scala:937)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
    at scala.collection.IterableLike.foreach(IterableLike.scala:70)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
    at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
    at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:56)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:51)
    at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
    at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
    at scala.collection.immutable.Range.foreach(Range.scala:155)
    at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
    at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
    at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:51)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
    at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
    at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
    at scala.collection.Iterator.foreach(Iterator.scala:937)
    at scala.collection.Iterator.foreach$(Iterator.scala:937)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
    at scala.collection.IterableLike.foreach(IterableLike.scala:70)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
    at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
    at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
    at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
    at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176)
    at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
    at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
    at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315)
    at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195)
    at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224)
    at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219)
    at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:253)
    at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:226)
    m
    • 2
    • 1
  • m

    Mitchell Jeppson

    03/23/2023, 11:45 PM
    Hey all! I have a flink job that is reading from one kafka topic and writing to another. What would be required to reset the offsets so my job so that it started fresh from the first kafka messages and worked through the entire topic again? Would I delete my checkpoint files? Clear the consumer offset in kafka? Both? Or something else?
    s
    • 2
    • 2
  • a

    Abhinav Ittekot

    03/24/2023, 6:42 AM
    hey everyone, we are working on migrating our Flink deployments from AWS EMR to Kubernetes (using the operator). On EMR, we are using emrfs to upload checkpoints to S3 whereas in Kubenetes, we are using Presto. We are trying to figure out a rollback plan where-in EMR is able to restore checkpoints created in Kubernetes in case we run into problems. When we tested this, the job in EMR is failing due to rate limit errors because it's using the Presto plugin and that needs some fine tuning. Perhaps it's performing some form of auto-discovery via
    s3://
    prefix passed for checkpoint location. Is there a way we can indicate to Flink to use emrfs like by changing the s3 prefix?
    p
    • 2
    • 1
  • s

    Siddhesh Kalgaonkar

    03/24/2023, 9:43 AM
    #C03G7LJTS2G I was trying to understand this line
    A shorter checkpointing interval causes higher overhead during regular processing but *can enable faster recovery* because less data needs to be reprocessed.
    I get that if the interval is short the throughput is high, but how does it help in faster recovery of the states? I am not sure about that. Can somebody help me to understand this in a better way?
  • s

    Siddhesh Kalgaonkar

    03/24/2023, 9:52 AM
    #C03G7LJTS2G I want to retain only last 3 months state of the operator/keyed state, so is it possible using
    callback
    method?
  • d

    Dan Sisson

    03/24/2023, 12:29 PM
    Good morning all! I'm having some trouble with setting a parameter at runtime to use for configuring the StatsD metric reporter. Running in Kubernetes, I'm trying to use an environment variable to define the host for the statsD client with this in my `flink-conf.yaml`:
    Copy code
    env.java.opts: -Dmetrics.reporter.stsd.host=${DD_AGENT_HOST}
    However, at startup, the reporter crashes with:
    Copy code
    2023-03-24 12:13:55,011 ERROR org.apache.flink.runtime.metrics.ReporterSetup               [] - Could not instantiate metrics reporter stsd. Metrics might not be exposed/reported.
    java.lang.IllegalArgumentException: Invalid host/port configuration. Host: null Port: 1125
    	at org.apache.flink.metrics.statsd.StatsDReporter.open(StatsDReporter.java:72) ~[?:?]
    ...
    ... so it seems the
    metrics.reporter.stsd.host
    is not being used in the dynamic properties... Should it be possible to set metric reporter properties using the
    env.java.opts
    ? Thanks you for any pointers!
    • 1
    • 1
  • r

    Rion Williams

    03/24/2023, 2:50 PM
    Hi all, I have a pipeline that is currently reading from Kafka and writing to Elasticsearch. I recently was doing some testing for how it handles failures and was wondering if there’s a best practice or recommendation for doing so. I know that previously the now deprecated
    ElasticsearchSink.Builder
    class exposed an
    onFailure
    handler to capture these types of situations:
    Copy code
    builder.onFailureHander { request, throwable, statusCode, indexer -> 
        // Handle failures here
    }
    However, after the sink was recently updated to use the
    Elasticsearch7SinkBuilder
    class, it doesn’t appear to expose the same level of flexibility for capturing failures within the sink outside of adjusting the backoff strategy. Ideally, I’d need the flexibility to process records in batches (via the internal
    BulkProcessor
    ) with the ability to discern between the types of failures that might occur similar to the following example:
    Copy code
    builder.onFailureHandler { request, throwable, statusCode, indexer -> 
        if (ExceptionUtils.findThrowable(throwable, EsRejectedExecutionException.class).isPresent()) {
            // Potentially transient/ES-related error, retry indexing
            indexer.add(action);
        } else if (ExceptionUtils.findThrowable(throwable, ElasticsearchParseException.class).isPresent()) {
            // Bad/invalid document, need to log and potentially send to a DLQ for further investigation
        } else {
            // Anything else, crash the job via an exception
       } 
    }
    Additionally, when processing in batches, how are single failures within a
    BulkProcessor
    call typically handled? Is the entire batch retried, or does the processor only reattempt the requests that initially failed. For example, if I had a single request that attempted to index 100 documents, but one of them was a duplicate or could not be inserted into the index, how would that be handled? Ideally, I’d like to capture the document(s) that couldn’t’ be written (i.e. throw them to a DLQ or something) or have some way to isolate what failed in the batch.
    t
    • 2
    • 3
  • y

    Yufei Chen

    03/24/2023, 3:58 PM
    [Autoscaler] Quick Question: Does Autoscaler work on FlinkDeployment deployment type = Session? In this case all job submitted by FlinkSessionJob can be autoscaled? Thanks!
  • k

    Kevin Lam

    03/24/2023, 4:03 PM
    👋 Has anyone who runs Flink on Kubernetes seen occasional seemingly synchronized waves of restarts of different Flink Clusters running on the same K8s cluster? I'm investigating an issue and it seems like the root case may be leader elections being triggered due to ConfigMaps used for HA being unavailable. This causes the job to restart. Wondering if anyone has seen something similar, and if they had any advice 🙏
    s
    • 2
    • 2
  • g

    Gil Kirkpatrick

    03/24/2023, 8:47 PM
    Greetings Flinkers, I'm trying to build a container for Flink that supports SQL queries on Kafka topics, and Kafka, the jobmanager, taskmanager, sql-client, and sql-gateway all come up and run. but when I create a table thus:
    CREATE TABLE FOO (id STRING, name STRING)
    WITH (
    'connector.type'='kafka',
    'connector.version'='universal',
    'connector.topic'='foo',
    'format.type'='json',
    'connector.properties.bootstrap.servers'='kafka:29091',
    'connector.properties.group.id'='flink'
    );
    I get this error when I try to select from the table:
    [ERROR] Could not execute SQL statement. Reason:
    org.apache.flink.table.factories.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
    the classpath.
    Reason: Required context properties mismatch.
    The following properties are requested:
    connector.properties.bootstrap.servers=kafka:29091
    connector.properties.group.id=flink
    connector.topic=foo
    connector.type=kafka
    connector.version=universal
    format.type=json
    schema.0.data-type=VARCHAR(<tel:2147483647|2147483647>)
    schema.0.name=id
    schema.1.data-type=VARCHAR(<tel:2147483647|2147483647>)
    schema.1.name=name
    The following factories have been considered:
    org.apache.flink.table.sources.CsvBatchTableSourceFactory
    org.apache.flink.table.sources.CsvAppendTableSourceFactory
    I interpreted the error as a symptom of not having the appropriate JAR files in /opt/flink/lib, so I added flink-sql-connector-kafka.jar, flink-connector-kafka.jar, and flink-table-api-java-1.17.0.jar to the image, but the error persists. The container is based off the Flink 1.17.0 container image on Docker Hub.
    m
    f
    • 3
    • 10
  • t

    Tawfik Yasser

    03/24/2023, 9:46 PM
    Hello, I got this error
    Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer.
    My event structure is as follows: {ip:
    152.89.196.211
    , ts:
    2023-03-04T23:12:26Z
    } IP: String, ts: Instant Can anyone help me? TIA.
    d
    • 2
    • 8
  • s

    Sai Sharath Dandi

    03/24/2023, 10:11 PM
    Hi folks, Is there any way to extract an array of a single column from an array of rows in flink SQL? I want to do something like below.
    person_array
    is an array of rows like
    ARRAY<ROW< name string, age int > >
    and I want to extract just the name fields into another array
    ARRAY< string >
    . I'd like to know if there's a way to do this with the built-in functions before writing a UDF
    Copy code
    SELECT
    ARRAY[person_array.name] as person_names
    FROM
      my_table
    -- this is obviously wrong syntax
    r
    • 2
    • 3
  • c

    czchen

    03/25/2023, 1:41 AM
    We have
    java.lang.ClassNotFoundException: Class org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not found
    when upgrading from Flink 1.16.0 to Flink 1.17.0 (with GCS as persistent storage). Any idea how to fix the problem? We have
    /opt/flink/opt/flink-gs-fs-hadoop-1.17.0.jar
    link to
    /opt/flink/opt/flink-gs-fs-hadoop-1.17.0.jar
    , so GCS shall work fine, and GCS works fine in Flink 1.16. The following is how we config Flink container in Dockerfile:
    Copy code
    RUN echo start \
        && echo "networkaddress.cache.ttl=0" >> $JAVA_HOME/lib/security/java.security \
        && ln -s /opt/flink/opt/flink-queryable-state-runtime-*.jar /opt/flink/lib/ \
        && ln -s /opt/flink/opt/flink-state-processor-api-*.jar /opt/flink/lib/ \
        && mkdir -p /opt/flink/plugins/gs-fs-hadoop \
        && ln -s /opt/flink/opt/flink-gs-fs-hadoop-*.jar /opt/flink/plugins/gs-fs-hadoop/flink-gs-fs-hadoop.jar \
        && mkdir -p /opt/flink/plugins/s3-fs-hadoop \
        && ln -s /opt/flink/opt/flink-s3-fs-hadoop-*.jar /opt/flink/plugins/s3-fs-hadoop/flink-s3-fs-hadoop.jar \
        && chown flink:flink /opt/flink/lib/* \
        && chmod 0644 /opt/flink/lib/* \
        && echo end
    The following is full exception log:
    Copy code
    java.util.concurrent.CompletionException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not found
            at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?]
            at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) [?:?]
            at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) [?:?]
            at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
            at java.lang.Thread.run(Unknown Source) [?:?]
    Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not found
            at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2720) ~[?:?]
            at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:107) ~[?:?]
            at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:102) ~[?:?]
            at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:451) ~[?:?]
            at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:338) ~[?:?]
            at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:300) ~[?:?]
            at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:575) ~[?:?]
            at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getUgiUserName(GoogleHadoopFileSystemBase.java:1226) ~[?:?]
            at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.listStatus(GoogleHadoopFileSystemBase.java:858) ~[?:?]
            at org.apache.flink.fs.gs.org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:170) ~[?:?]
            at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.listStatus(PluginFileSystemFactory.java:141) ~[flink-dist-1.17.0.jar:1.17.0]
            at org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:180) ~[flink-dist-1.17.0.jar:1.17.0]
            at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118) ~[flink-dist-1.17.0.jar:1.17.0]
            at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100) ~[flink-dist-1.17.0.jar:1.17.0]
            at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:194) ~[flink-dist-1.17.0.jar:1.17.0]
            at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.17.0.jar:1.17.0]
            at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188) ~[flink-dist-1.17.0.jar:1.17.0]
            ... 4 more
    Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not found
            at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2688) ~[?:?]
            at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2712) ~[?:?]
            at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:107) ~[?:?]
            at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:102) ~[?:?]
            at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:451) ~[?:?]
            at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:338) ~[?:?]
            at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:300) ~[?:?]
            at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:575) ~[?:?]
            at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getUgiUserName(GoogleHadoopFileSystemBase.java:1226) ~[?:?]
            at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.listStatus(GoogleHadoopFileSystemBase.java:858) ~[?:?]
            at org.apache.flink.fs.gs.org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:170) ~[?:?]
            at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.listStatus(PluginFileSystemFactory.java:141) ~[flink-dist-1.17.0.jar:1.17.0]
            at org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:180) ~[flink-dist-1.17.0.jar:1.17.0]
            at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118) ~[flink-dist-1.17.0.jar:1.17.0]
            at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100) ~[flink-dist-1.17.0.jar:1.17.0]
            at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:194) ~[flink-dist-1.17.0.jar:1.17.0]
            at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.17.0.jar:1.17.0]
            at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188) ~[flink-dist-1.17.0.jar:1.17.0]
            ... 4 more
    Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not found
            at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2592) ~[?:?]
            at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2686) ~[?:?]
            at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2712) ~[?:?]
            at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:107) ~[?:?]
            at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:102) ~[?:?]
            at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:451) ~[?:?]
            at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:338) ~[?:?]
            at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:300) ~[?:?]
            at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:575) ~[?:?]
            at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getUgiUserName(GoogleHadoopFileSystemBase.java:1226) ~[?:?]
            at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.listStatus(GoogleHadoopFileSystemBase.java:858) ~[?:?]
            at org.apache.flink.fs.gs.org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:170) ~[?:?]
            at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.listStatus(PluginFileSystemFactory.java:141) ~[flink-dist-1.17.0.jar:1.17.0]
            at org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:180) ~[flink-dist-1.17.0.jar:1.17.0]
            at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118) ~[flink-dist-1.17.0.jar:1.17.0]
            at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100) ~[flink-dist-1.17.0.jar:1.17.0]
            at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:194) ~[flink-dist-1.17.0.jar:1.17.0]
            at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.17.0.jar:1.17.0]
            at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188) ~[flink-dist-1.17.0.jar:1.17.0]
            ... 4 more
    t
    • 2
    • 3
  • c

    ConradJam

    03/26/2023, 4:38 AM
    Hi all . I would like to consult about some configurations of Rocksdb incremental checkpoints and GIC. In Flink 1.17 , If Generic Incremental Checkpoint (GIC) enable, rocksdb Incremental Checkpoint can be disable or enable,Do they both have conflicting switches, does my turning on (GIC) mean I no longer need enable rocksdb Incremental Checkpoint 👀 ? The community seems to have no documentation to describe whether the two can be shared or only one of them can be enabled, and the other does not need to be enabled
  • p

    Pavan kalyan athukuri

    03/26/2023, 10:18 AM
    Hi My flink application is running fine in intellij and consuming kafka msg. but not consuming kafka msg when submitted jar to flink. Can anyone know what would be the reason. The logs contain no errors.
  • n

    Niels Basjes

    03/26/2023, 9:47 PM
    Hi, In my project I have a flink-table wrapper around the functionality offered by my project. Since Flink 1.17.0 the normal Flink wrapper still works fine, yet the flink-table wrapper fails during the build with mostly:
    Copy code
    Error:  nl.basjes.parse.useragent.flink.table.TestTableFunctionClientHints.testMapFunctionReturnMap  Time elapsed: 5.42 s  <<< ERROR!
    java.lang.IllegalArgumentException: fromIndex(2) > toIndex(0)
    	at java.base/java.util.AbstractList.subListRangeCheck(AbstractList.java:509)
    	at java.base/java.util.AbstractList.subList(AbstractList.java:497)
    	at org.apache.calcite.rel.metadata.janino.CacheGeneratorUtil$CacheKeyStrategy$1.safeArgList(CacheGeneratorUtil.java:213)
    	at org.apache.calcite.rel.metadata.janino.CacheGeneratorUtil$CacheKeyStrategy$1.cacheKeyBlock(CacheGeneratorUtil.java:205)
    	at org.apache.calcite.rel.metadata.janino.CacheGeneratorUtil.cachedMethod(CacheGeneratorUtil.java:68)
    	at org.apache.calcite.rel.metadata.janino.RelMetadataHandlerGeneratorUtil.generateHandler(RelMetadataHandlerGeneratorUtil.java:121)
    	at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.generateCompileAndInstantiate(JaninoRelMetadataProvider.java:138)
    	at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.lambda$static$0(JaninoRelMetadataProvider.java:73)
    	at org.apache.flink.calcite.shaded.com.google.common.cache.CacheLoader$FunctionToCacheLoader.load(CacheLoader.java:165)
    	at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
    	at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
    	at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
    	at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
    	at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3951)
    	at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
    	at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
    	at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.revise(JaninoRelMetadataProvider.java:197)
    	at org.apache.calcite.rel.metadata.RelMetadataQueryBase.revise(RelMetadataQueryBase.java:118)
    	at org.apache.calcite.rel.metadata.RelMetadataQuery.getPulledUpPredicates(RelMetadataQuery.java:844)
    	at org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:307)
    	at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:337)
    ...
    b
    s
    r
    • 4
    • 12
  • u

    刘路

    03/27/2023, 8:11 AM
    can anyone using pyflink in macbook pro m1 . I using apache-flink 1.16.0 in pycharm but I got alot of exception.
    m
    d
    • 3
    • 50
  • r

    Rafael Jeon

    03/27/2023, 8:44 AM
    Hi All, I am currently using flink-cdc-connectors 2.4 to pipeline MySQL data. However, due to binlog loss, I need to resume from the snapshot. How can I read snapshot data starting from a specific date?
    • 1
    • 2
1...676869...98Latest