https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • s

    Slackbot

    02/14/2023, 6:55 PM
    This message was deleted.
    s
    m
    +2
    • 5
    • 10
  • a

    Amol Khare

    02/14/2023, 7:30 PM
    Hi Team, I am trying out using Async Operation on a KeyedStream. I observed that events for same key can get processed parallely inside Async Function as I have used Async r2dbc library to get data from postgres for event enrichment. I on the other hand always want to get them processed sequentially. How can I mandate sequential processing per key within Async Operation ?
  • s

    sap1ens

    02/14/2023, 10:52 PM
    Hey folks, does anyone here use Cloudflare R2 storage with Flink?
  • n

    Nathanael England

    02/15/2023, 12:03 AM
    What's the right way to get around the following error when writing a unit test for pyflink?
    Copy code
    Could not found the Java class 'org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration.Builder'. The Java dependencies could be specified via command line argument '--jarfile' or the config option 'pipeline.jars'
    d
    • 2
    • 7
  • s

    Sumit Aich

    02/15/2023, 7:03 AM
    Hi team, is there a doc on how to rollback the Flink K8s Operator to an older version ?
    g
    • 2
    • 5
  • p

    Pedro Cunha

    02/15/2023, 10:00 AM
    Hey there guys. I have a Flink job running in version
    1.14.4
    and I have no issues running the docker images for that version. When I try to update to
    1.14.6
    (and any version above that) I start getting these errors when the
    jobmanager
    starts
    Copy code
    Run if [ "$(docker inspect --format='{{.State.Health.Status}}' jobmanager)" != "healthy" ]; then
    Container is not healthy
    Error: ] The execution result is empty.
     Starting Job Manager
    Error: ] Could not get JVM parameters and dynamic configurations properly.
    Error: ] Raw output from BashJavaUtils:
    Warning:  [0.004s][warning][os,thread] Failed to start thread "GC Thread#0" - pthread_create failed (EPERM) for attributes: stacksize: 1024k, guardsize: 4k, detached.
     #
     # There is insufficient memory for the Java Runtime Environment to continue.
     # Cannot create worker GC thread. Out of system resources.
     # An error report file with more information is saved as:
     # /opt/flink/hs_err_pid141.log
    Error: Process completed with exit code 1.
    Has anyone have any idea why this might be happening?
    • 1
    • 1
  • c

    chunilal kukreja

    02/15/2023, 1:22 PM
    Hi Team, I have a usecase to implement where kafka source is reading stream of events (unbounded), followed by keyby(tenantId + specialId) + global window followed by asyncIO and kafka sink. Now in asyncIO have couple of business functions to execute based on “specialId”. Now the execution should be done serially so that one batch from one type of key gets executed and only on its returned response next batch for the same key should get executed. But with asyncIO execution is performed parallel & any of the subtask picks the batch & start the execution. How can I make the execution per key done serially using flink provided operators or logic?
  • a

    Abhinav sharma

    02/15/2023, 1:33 PM
    Hi, I am trying to sink the flink results to a Kafka topic. I am serializing a record using SerializationSchema where when I print, I am seeing the serialised message. But when I put data.sinkTo(sink) it starts giving [kafka-producer-network-thread | producer-Kafka-sink-0-1] ……… Node x disconnected
  • k

    Kosta Sovaridis

    02/15/2023, 1:53 PM
    Hi, I am using a Flink Deployment with Flink Session Jobs, I saw that only Savepoint upgrade is supported for session jobs. I was wondering if I could use the Kubernetes HA recovery was available to upgrade in Session Jobs
    g
    • 2
    • 1
  • m

    Matyas Orhidi

    02/15/2023, 3:41 PM
    qq folks: the property
    execution.checkpointing.interval
    is seemingly being ignored when testing locally in a minicluster, why is that? I can only enable checkpointing for testing purposes using
    env.enableCheckpointing(...)
    🤷
    Copy code
    private static MiniClusterExtension createMiniCluster() {
            final Configuration config = new Configuration();
            config.setString("state.checkpoints.dir", "file:///tmp/flink/checkpoint");
            ignored --> config.setString("execution.checkpointing.interval","10s");
    
            return new MiniClusterExtension(
                    new MiniClusterResourceConfiguration.Builder()
                            .setConfiguration(config)
                            .setNumberSlotsPerTaskManager(2)
                            .build());
        }
  • a

    Amir Hossein Sharifzadeh

    02/15/2023, 7:35 PM
    Hello. I am currently working on a data streaming project and our team decided to use Table API to do data processing on that are sending through the Kafka Producer. We are sending thousands of messages on two different topics. Every single message contains a large string (2^14 characters), and the message is being serialized within JSON format. I used the Table API to run query over two tables (J*OIN SQL*) and pass results to a data processing function when they are ready: 1) Is Table API able to handle queries on data that contain large amount of strings? 2) If so, what options are there to make processes faster? 3) I use Pyflink and it does not seem to be fast enough. Is Table API might be faster through Java or Scala? If so, which one is fastest at this point? Thank you very much for any suggestions. Here this is sample of my code:
    Copy code
    #### On Producer side:
        for raw_id in tqdm.tqdm(range(0, Nt)):
            data_array = fid.read(chunk_size)
            b64_string = str(base64.b64encode(data_array), 'utf-8')
            empad_json = create_sample_empad_json(raw_id, b64_string, nFileLen, chunk_size)
            # print(b64_string)
            producer.send(topic, empad_json)
            producer.flush()
    
    def create_sample_empad_json(raw_id, raw_data, file_size, chunk_size):
        return {'raw_id': int(raw_id), 'raw_data': raw_data, 'file_size': file_size, 'chunk_size': chunk_size}
    
    #### On Flink side:
    def stream_processing():
        env = StreamExecutionEnvironment.get_execution_environment()
        t_env = StreamTableEnvironment.create(stream_execution_environment=env)
        t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)
    
        t1 = f"""
                CREATE TABLE raw_table(
                    raw_id INT,
                    raw_data STRING,
                    file_size INT,
                    chunk_size INT
                ) WITH (
                  'connector' = 'kafka',
                  'topic' = 'empadraw',
                  'properties.bootstrap.servers' = 'kafka:9092',
                  'properties.group.id' = 'MY_GRP',
                  'scan.startup.mode' = 'latest-offset',
                  'format' = 'json'
                )
                """
    
        t2 = f"""
                CREATE TABLE bkgd_table(
                    raw_id INT,
                    raw_data STRING,
                    file_size INT,
                    chunk_size INT
                ) WITH (
                  'connector' = 'kafka',
                  'topic' = 'empadbkgd',
                  'properties.bootstrap.servers' = 'kafka:9092',
                  'properties.group.id' = 'MY_GRP',
                  'scan.startup.mode' = 'latest-offset',
                  'format' = 'json'
                )
                """
    
        t_env.execute_sql(t1)
        t_env.execute_sql(t2)
    
        query = "SELECT raw_table.raw_id as raw_id, raw_table.raw_data as raw_enc_val, raw_table.file_size as raw_file_size, raw_table.chunk_size as raw_chunk_size, " \
                "bkgd_table.raw_data as bkgd_enc_val, bkgd_table.file_size as bkgd_file_size, bkgd_table.chunk_size as bkgd_chunk_size " \
                "FROM raw_table JOIN bkgd_table ON raw_table.raw_id = bkgd_table.raw_id"
    
        table_result = t_env.sql_query(query).execute().collect()
        process_results(table_result)
    
       def process_results(table_result):
        with table_result as results:
            for result in results:
                chunk_id = int(result[0])
                raw_chunk = str(result[1])
                data_chunk = str(result[2])
                file_size = int(result[3])
                chunk_size = int(result[4])
    
                process_data(chunk_id, raw_chunk, bkgd_chunk, file_size, chunk_size)
    g
    d
    • 3
    • 17
  • s

    Sofya T. Irwin

    02/15/2023, 10:07 PM
    Has anyone seen this error while running standalone session (this is coming from the taskmanager log within the container):
    Copy code
    java.io.IOException: Could not connect to BlobServer at address localhost/127.0.0.1:37269
    I’m trying to run flink 1.16 in a Docker container setup (older version of Flink was working ok with the same setup and I’m just trying to port it to later version).
  • s

    Slackbot

    02/15/2023, 11:28 PM
    This message was deleted.
    j
    • 2
    • 3
  • r

    Reme Ajayi

    02/16/2023, 12:56 AM
    Hi I am trying to join two datastreams from Kafka and output to a sink, however my joined stream does not output any data. In the overview window of the Flink UI, it also looks like my sink doesn't exist and after a few minutes my job crashes and runs out of memory, What is wrong with my join logic?
    Copy code
    DataStream<Enhanced> joinedStream = EntriesStream.join(historyStream)
    				.where(new KeySelector<GenericRecord, String>() {
    
    					@Override
    					public String getKey(GenericRecord value) throws Exception {
    						return value.get("la_id").toString();
    
    					}
    				}).equalTo(new KeySelector<GenericRecord, String>() {
    
    					@Override
    					public String getKey(GenericRecord value) throws Exception {
    						return value.get("id").toString();
    					}
    				}).window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
    				.apply(new JoinFunction<GenericRecord, GenericRecord, Enhanced>() {
    
    
    					@Override
    					public Enhanced join(GenericRecord first, GenericRecord second) throws Exception {
    						return new Enhanced(
    								Long.parseLong(first.get("c_at").toString()),
    								first.get("c_type").toString(),
    								first.get("id").toString(),
    								Integer.parseInt(first.get("d_cts").toString()),
    								Integer.parseInt(first.get("c_cts").toString()),
    								second.get("prov").toString(),
    								second.get("bb_S_T").toString(),
    								second.get("p_id").toString(),
    								second.get("s_ccurr").toString()
    						);
    					}
    				});
    Things I have tried so far: 1. Increased task slots on the task manager 2. Used processing time instead of event time
  • s

    Sylvia Lin

    02/16/2023, 1:07 AM
    Hey folks! Is Flink 1.16 supporting s3 hadoop irsa on AWS EKS now? Seeing this ticket, but FileSink on s3a still doesn't work with AWS EKS irsa. Any workaround to sink files with s3p? https://issues.apache.org/jira/browse/FLINK-23487?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel
    s
    h
    • 3
    • 18
  • n

    Nathanael England

    02/16/2023, 2:03 AM
    Can you use the
    logging
    module in pyflink? I know my code is being executed, but I don't see any of the logs from inside my process function. This is in a unit test, so I'm not sure if there's something configuring them to be hidden.
    d
    • 2
    • 2
  • j

    Jonathan Weaver

    02/16/2023, 3:39 AM
    Does anyone know how to correctly clean threaded resources up on an exception (say a network blip) that results in an restart? I have some DB pools that if an exception occurs elsewhere in the DAG, when it restarts the process the pool thread is just abandoned and still living in the JVM process leaving connections opened. Only if the exception occurs inside the actual DB task can I catch and properly close the pool.
  • n

    Nathanael England

    02/16/2023, 4:16 AM
    I see process functions have access to an
    open
    method to perform heavier initialization tasks on startup. When working with a broadcast process function, is there a way to get the broadcast state in there? I see methods available for getting other types of state but not broadcast.
    u
    • 2
    • 6
  • e

    Emmanuel Leroy

    02/16/2023, 4:45 AM
    I’m looking for ways to improve throughput of my Flink job. I used the code here: https://stackoverflow.com/questions/33904793/kafka-flink-performance-issues by @rmetzger that was helpful in figuring my throughput, and I can get 50k msg/core/s and ~70MB/s from a single topic/1 partition. However I am confused: the job ‘busy’ status is barely 3%, the CPU is ~200mi on the TM, and I know my Kafka cluster can get me at least 100MB/s per partition, so somehow I’m only at 70% of what I could expect, with the job not being very busy. If I read from 2 topics, 1 partition each, I get to 140MB/s, so the job seems perfectly able to handle that, yet with 1 topic/1partition is is not able to reach max throughput. I tuned
    fetch.max.bytes
    ,
    max.partition.fetch.bytes
    ,
    max.records
    as well as
    receive.buffer.bytes
    to much larger numbers than default, making little to no difference, so I feel like it is not a consumer config issue. I’m just not sure what else it may be at this point. Any pointed to how to troubleshoot this would be appreciated. Thanks
  • g

    Guruguha Marur Sreenivasa

    02/16/2023, 5:29 AM
    Hi all, we have a Flink cluster running in session mode and has about 25 jobs running on the cluster with 80 pods. I have a hunch that Flink doesn't commit offsets correctly at regular intervals causing the consumer lag to shoot up for sometime and rapidly coming down. Is there a setting that I can look at or is there a metric which shows at what times flink consumer committed Kafka offsets?
    d
    • 2
    • 15
  • t

    Tony Wang

    02/16/2023, 6:09 AM
    According to @David Anderson’s answer to this question: https://stackoverflow.com/questions/73413870/flink-sql-watermark-strategy-after-join-operation, interval joins should have watermarks. However I tried a very simple interval join:
    create view test4 as(select *, current_watermark(test1.order_time) as curr, if(test1.order_time > test2.order_time, test1.order_time, test2.order_time) as mt from test1 inner join test2 on test1.product = test2.product where test1.order_time BETWEEN test2.order_time - INTERVAL '1' SECOND and test2.order_time);
    and the
    desc test4
    showed that the view doesn't have a watermark defiined. Did I mess up somewhere?
    d
    • 2
    • 23
  • t

    Tony Wang

    02/16/2023, 6:16 AM
    For that matter, using the temporal join also didn't seem to produce watermarks:
    reate view test4 as(select *, current_watermark(test1.order_time) as curr, if(test1.order_time > test2.order_time, test1.order_time, test2.order_time) as mt from test1 inner join test2 FOR SYSTEM_TIME AS OF test2.order_time on test1.product = test2.product);
  • s

    Surkhay Surkhayli

    02/16/2023, 7:10 AM
    Hello, I hope someone will be able to help me figure out this issue. I am not familiar with Flink much. Version is 1.13.2. It happens on server side when I am trying to run test on pipelines, and they are failing. What maybe the possible solution for this one?
    [info]   org.apache.flink.runtime.client.JobExecutionException: Job completed with illegal application status: UNKNOWN.
    331s220[info]   at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:148)
    331s221[info]   at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
    331s222[info]   at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
    331s223[info]   at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    331s224[info]   at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
    331s225[info]   at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)
    331s226[info]   at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    331s227[info]   at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
    331s228[info]   at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    331s229[info]   at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
  • a

    Amenreet Singh Sodhi

    02/16/2023, 7:43 AM
    Hi Team, I am deploying my flink job in application mode on a k8s cluster and using external NFS to store checkpoints, but for some reason, when i redeploy the job/new job(with changes in the previous jar), the job starts on the JM with the last checkpoint of the previous job. If i cancel the job, then it starts the actual job and checkpoint from start with 0 id. What could be the possible reason for this?Thanks
  • r

    Reme Ajayi

    02/16/2023, 11:34 AM
    Hi I am trying to join two Kafka Data Streams from and output to another Kafka topic, however my joined stream does not output any data. After some time, my program crashes and runs out of memory, which I think is a result of the join not working. My code doesn't throw any errors, but the joins don't produce any output. My join logic is below, please suggest possible solutions. P.S: Things I have tried so far: 1. Increased task slots on the task manager 2. Added Watermarks and Timestamps to my Kafka sources 3. Used processing time as my time characteristic
    Copy code
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    		// Use event time from the source record
    		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    		// Use S3 as the backend store to store checkpoints for recovery
    		env.setStateBackend(new HashMapStateBackend());
    		// Perform checkpoint every minute
    		env.enableCheckpointing(60000);
    		env.getCheckpointConfig().setCheckpointStorage(config.getCheckpointPath());
    
    		KafkaSource<GenericRecord> history_source = KafkaSource.<GenericRecord>builder()
    				.setBootstrapServers(BOOTSTRAP_SERVERS)
    			    .setTopics("history_topic")
    				.setStartingOffsets(OffsetsInitializer.earliest())
    				.setGroupId("test-001")
    				.setProperty("security.protocol", "SASL_SSL")
    				.setProperty("sasl.mechanism", "PLAIN")
    				.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='" + CONFLUENT_API_KEY + "' password='" + CONFLUENT_SECRET + "';")
    				.setProperty("client.dns.lookup","use_all_dns_ips")
    				.setValueOnlyDeserializer(ConfluentRegistryAvroDeserializationSchema.forGeneric(historySchema, SCHEMA_REGISTRY_URL, SCHEMA_REGISTRY_CONFIG))
    				.build();
    
    		KafkaSource<GenericRecord> entries_source = KafkaSource.<GenericRecord>builder()
    				.setBootstrapServers(BOOTSTRAP_SERVERS)
    				.setTopics("entries_topic")
    				.setStartingOffsets(OffsetsInitializer.earliest())
    				.setGroupId("test-001")
    				.setProperty("security.protocol", "SASL_SSL")
    				.setProperty("sasl.mechanism", "PLAIN")
    				.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='" + CONFLUENT_API_KEY + "' password='" + CONFLUENT_SECRET + "';")
    				.setProperty("client.dns.lookup","use_all_dns_ips")
    				.setValueOnlyDeserializer(ConfluentRegistryAvroDeserializationSchema.forGeneric(EntriesSchema, SCHEMA_REGISTRY_URL, SCHEMA_REGISTRY_CONFIG))
    				.build();
    
    		DataStream<GenericRecord> historyStream = env.fromSource(history_source,  WatermarkStrategy.<GenericRecord>forMonotonousTimestamps().withTimestampAssigner((event, timestamp) -> {
    			return timestamp;
    		}), "History Source");
    		DataStream<GenericRecord> journalEntriesStream = env.fromSource(entries_source,  WatermarkStrategy.<GenericRecord>forMonotonousTimestamps().withTimestampAssigner((event, timestamp) -> {
    			return timestamp;
    		}), "Entries Source");
    
    
     DataStream<Enhanced> joinedStream = EntriesStream.join(historyStream)
    				.where(new KeySelector<GenericRecord, String>() {
    
    					@Override
    					public String getKey(GenericRecord value) throws Exception {
    						return value.get("la_id").toString();
    
    					}
    				}).equalTo(new KeySelector<GenericRecord, String>() {
    
    					@Override
    					public String getKey(GenericRecord value) throws Exception {
    						return value.get("id").toString();
    					}
    				}).window(TumblingEventTimeWindows.of(Time.seconds(30)))
    				.apply(new JoinFunction<GenericRecord, GenericRecord, Enhanced>() {
    
    
    					@Override
    					public Enhanced join(GenericRecord first, GenericRecord second) throws Exception {
    						return new Enhanced(
    								Long.parseLong(first.get("c_at").toString()),
    								first.get("c_type").toString(),
    								first.get("id").toString(),
    								Integer.parseInt(first.get("d_cts").toString()),
    								Integer.parseInt(first.get("c_cts").toString()),
    								second.get("prov").toString(),
    								second.get("bb_S_T").toString(),
    								second.get("p_id").toString(),
    								second.get("s_ccurr").toString()
    						);
    					}
    				});
    
    		joinedStream.print();
  • a

    Adesh Dsilva

    02/16/2023, 12:20 PM
    Hi I am trying to find how much time it took for a message on Kafka to be picked up by my Flink app. These two metrics seem relevant:
    currentFetchEventTimeLag
    and
    currentEmitEventTimeLag
    I can see the metric
    currentEmitEventTimeLag
    but not
    currentFetchEventTimeLag
    in Flink UI. Any idea why? What is the difference between the two? Will backpressure cause differences between them?
  • m

    Matthias

    02/16/2023, 2:21 PM
    a FlinkSQL question: I'm running a simple test using the sql client (1.16.1): 1. I set a table with sequential data:
    Copy code
    CREATE TABLE MyTable (
    >    a bigint,
    >    b int not null,
    >    c varchar,
    >    d timestamp(3)
    > ) with ('connector' = 'datagen', 'rows-per-second' = '1', 'fields.a.kind' = 'sequence', 'fields.a.start' = '0', 'fields.a.end' = '1000000');
    2. Then, I start a simple query on that table:
    Copy code
    SELECT a FROM MyTable;
    Which results in a constant flow of ever increasing integer values to be printed in the sql client. 3. The job shows up in the logs and the UI. I stop the job with a savepoint:
    Copy code
    $ ./bin/flink stop --savepointPath ../1.16.1-savepoint <job-id>
    This creates a savepoint and stops the job. 4. I want to restart the job from within the sql client:
    Copy code
    SET 'execution.savepoint.path' = '<path-to-savepoint>';
    SELECT a FROM MyTable;
    --- My expectation would be that the sql client starts printing the incrementing number leaving the numbers out that were already printed in the run before the savepoint was created. But the sql client is not printing anything at all. With a more complex example, I actually see in the Flink UI that data is sent between the tasks. So, it's not that nothing is happening. But it seems like I'm missing something here 🤔
  • k

    Kellan Burket

    02/16/2023, 2:35 PM
    Hi, I'm trying to figure out how to serialize Scala case classes without Kryo, but haven't had any luck. When I turn off generic types I get this error:
    org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Generic types have been disabled in the ExecutionConfig and type KeyedOutputEvent2 is treated as a generic type.
    Seems like this should be possible based on the documentation.
    s
    • 2
    • 6
  • t

    Tony Wang

    02/16/2023, 4:38 PM
    I am trying to run FlinkSQL on a ton of historical batch data, say in Parquet files. What are some recommended configurations to get best performance here?
  • a

    Alexander Preuss

    02/16/2023, 5:01 PM
    Hi, I’m trying to use the Flink 1.16.1 Kafka Connector with
    OAUTHBEARER
    authentication. I’m configuring the
    KafkaSource
    like this:
    Copy code
    KafkaSource<String> source = KafkaSource.<String>builder()
    				.setBootstrapServers(bootstrapServers)
    				.setTopics(inputTopic)
    				.setStartingOffsets(OffsetsInitializer.earliest())
    				.setValueOnlyDeserializer(new SimpleStringSchema())
    				.setProperty("security.protocol", "SASL_SSL")
    				.setProperty("sasl.login.callback.handler.class", OauthLoginCallbackHandler.class.getName())
    				.setProperty("sasl.mechanism", "OAUTHBEARER")
    				.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required"
    				+ " oauth.issuer.url=\"%s\""
    				+ " oauth.credentials.url=\"%s\""
    				+ " oauth.audience=\"%s\";";)
    				.build();
    When I submit my job I run into the following `NoClassDefFoundError`:
    Copy code
    2023-02-16 15:54:06,180 ERROR org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Uncaught exception in the SplitEnumerator for Source Source: Kafka source while starting the SplitEnumerator.. Triggering job failover.
    org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient
    	at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:546) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:478) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	at org.apache.kafka.clients.admin.Admin.create(Admin.java:133) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	at org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:39) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getKafkaAdminClient(KafkaSourceEnumerator.java:410) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.start(KafkaSourceEnumerator.java:151) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$start$1(SourceCoordinator.java:225) ~[flink-dist-1.16.1.jar:1.16.1]
    	at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$10(SourceCoordinator.java:449) ~[flink-dist-1.16.1.jar:1.16.1]
    	at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) [flink-dist-1.16.1.jar:1.16.1]
    	at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [?:?]
    	at java.util.concurrent.FutureTask.run(Unknown Source) [?:?]
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) [?:?]
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
    	at java.lang.Thread.run(Unknown Source) [?:?]
    Caused by: org.apache.kafka.common.KafkaException: java.lang.NoClassDefFoundError: com/fasterxml/jackson/databind/ObjectMapper
    	at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:184) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:517) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	... 14 more
    Caused by: java.lang.NoClassDefFoundError: com/fasterxml/jackson/databind/ObjectMapper
    	at org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredJws.toMap(OAuthBearerUnsecuredJws.java:299) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	at org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredJws.<init>(OAuthBearerUnsecuredJws.java:83) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	at org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handleTokenCallback(OAuthBearerUnsecuredLoginCallbackHandler.java:211) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	at org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handle(OAuthBearerUnsecuredLoginCallbackHandler.java:163) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:316) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	at javax.security.auth.login.LoginContext.invoke(Unknown Source) ~[?:?]
    	at javax.security.auth.login.LoginContext$4.run(Unknown Source) ~[?:?]
    	at javax.security.auth.login.LoginContext$4.run(Unknown Source) ~[?:?]
    	at java.security.AccessController.doPrivileged(Native Method) ~[?:?]
    	at javax.security.auth.login.LoginContext.invokePriv(Unknown Source) ~[?:?]
    	at javax.security.auth.login.LoginContext.login(Unknown Source) ~[?:?]
    	at org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin.login(ExpiringCredentialRefreshingLogin.java:204) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	at org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerRefreshingLogin.login(OAuthBearerRefreshingLogin.java:150) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:62) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:105) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:170) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:517) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	... 14 more
    Caused by: java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.ObjectMapper
    	at java.net.URLClassLoader.findClass(Unknown Source) ~[?:?]
    	at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
    	at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67) ~[flink-dist-1.16.1.jar:1.16.1]
    	at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) ~[flink-dist-1.16.1.jar:1.16.1]
    	at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51) ~[flink-dist-1.16.1.jar:1.16.1]
    	at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
    	at org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredJws.toMap(OAuthBearerUnsecuredJws.java:299) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	at org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredJws.<init>(OAuthBearerUnsecuredJws.java:83) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	at org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handleTokenCallback(OAuthBearerUnsecuredLoginCallbackHandler.java:211) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	at org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handle(OAuthBearerUnsecuredLoginCallbackHandler.java:163) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:316) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	at javax.security.auth.login.LoginContext.invoke(Unknown Source) ~[?:?]
    	at javax.security.auth.login.LoginContext$4.run(Unknown Source) ~[?:?]
    	at javax.security.auth.login.LoginContext$4.run(Unknown Source) ~[?:?]
    	at java.security.AccessController.doPrivileged(Native Method) ~[?:?]
    	at javax.security.auth.login.LoginContext.invokePriv(Unknown Source) ~[?:?]
    	at javax.security.auth.login.LoginContext.login(Unknown Source) ~[?:?]
    	at org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin.login(ExpiringCredentialRefreshingLogin.java:204) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	at org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerRefreshingLogin.login(OAuthBearerRefreshingLogin.java:150) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:62) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:105) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:170) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:517) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
    	... 14 more
    Any idea why this happens?
    s
    m
    • 3
    • 13
1...565758...98Latest