Slackbot
02/14/2023, 6:55 PMAmol Khare
02/14/2023, 7:30 PMsap1ens
02/14/2023, 10:52 PMNathanael England
02/15/2023, 12:03 AMCould not found the Java class 'org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration.Builder'. The Java dependencies could be specified via command line argument '--jarfile' or the config option 'pipeline.jars'
Sumit Aich
02/15/2023, 7:03 AMPedro Cunha
02/15/2023, 10:00 AM1.14.4
and I have no issues running the docker images for that version. When I try to update to 1.14.6
(and any version above that) I start getting these errors when the jobmanager
starts
Run if [ "$(docker inspect --format='{{.State.Health.Status}}' jobmanager)" != "healthy" ]; then
Container is not healthy
Error: ] The execution result is empty.
Starting Job Manager
Error: ] Could not get JVM parameters and dynamic configurations properly.
Error: ] Raw output from BashJavaUtils:
Warning: [0.004s][warning][os,thread] Failed to start thread "GC Thread#0" - pthread_create failed (EPERM) for attributes: stacksize: 1024k, guardsize: 4k, detached.
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Cannot create worker GC thread. Out of system resources.
# An error report file with more information is saved as:
# /opt/flink/hs_err_pid141.log
Error: Process completed with exit code 1.
Has anyone have any idea why this might be happening?chunilal kukreja
02/15/2023, 1:22 PMAbhinav sharma
02/15/2023, 1:33 PMKosta Sovaridis
02/15/2023, 1:53 PMMatyas Orhidi
02/15/2023, 3:41 PMexecution.checkpointing.interval
is seemingly being ignored when testing locally in a minicluster, why is that? I can only enable checkpointing for testing purposes using env.enableCheckpointing(...)
🤷
private static MiniClusterExtension createMiniCluster() {
final Configuration config = new Configuration();
config.setString("state.checkpoints.dir", "file:///tmp/flink/checkpoint");
ignored --> config.setString("execution.checkpointing.interval","10s");
return new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(config)
.setNumberSlotsPerTaskManager(2)
.build());
}
Amir Hossein Sharifzadeh
02/15/2023, 7:35 PM#### On Producer side:
for raw_id in tqdm.tqdm(range(0, Nt)):
data_array = fid.read(chunk_size)
b64_string = str(base64.b64encode(data_array), 'utf-8')
empad_json = create_sample_empad_json(raw_id, b64_string, nFileLen, chunk_size)
# print(b64_string)
producer.send(topic, empad_json)
producer.flush()
def create_sample_empad_json(raw_id, raw_data, file_size, chunk_size):
return {'raw_id': int(raw_id), 'raw_data': raw_data, 'file_size': file_size, 'chunk_size': chunk_size}
#### On Flink side:
def stream_processing():
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)
t1 = f"""
CREATE TABLE raw_table(
raw_id INT,
raw_data STRING,
file_size INT,
chunk_size INT
) WITH (
'connector' = 'kafka',
'topic' = 'empadraw',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'MY_GRP',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
"""
t2 = f"""
CREATE TABLE bkgd_table(
raw_id INT,
raw_data STRING,
file_size INT,
chunk_size INT
) WITH (
'connector' = 'kafka',
'topic' = 'empadbkgd',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'MY_GRP',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
)
"""
t_env.execute_sql(t1)
t_env.execute_sql(t2)
query = "SELECT raw_table.raw_id as raw_id, raw_table.raw_data as raw_enc_val, raw_table.file_size as raw_file_size, raw_table.chunk_size as raw_chunk_size, " \
"bkgd_table.raw_data as bkgd_enc_val, bkgd_table.file_size as bkgd_file_size, bkgd_table.chunk_size as bkgd_chunk_size " \
"FROM raw_table JOIN bkgd_table ON raw_table.raw_id = bkgd_table.raw_id"
table_result = t_env.sql_query(query).execute().collect()
process_results(table_result)
def process_results(table_result):
with table_result as results:
for result in results:
chunk_id = int(result[0])
raw_chunk = str(result[1])
data_chunk = str(result[2])
file_size = int(result[3])
chunk_size = int(result[4])
process_data(chunk_id, raw_chunk, bkgd_chunk, file_size, chunk_size)
Sofya T. Irwin
02/15/2023, 10:07 PMjava.io.IOException: Could not connect to BlobServer at address localhost/127.0.0.1:37269
I’m trying to run flink 1.16 in a Docker container setup (older version of Flink was working ok with the same setup and I’m just trying to port it to later version).Slackbot
02/15/2023, 11:28 PMReme Ajayi
02/16/2023, 12:56 AMDataStream<Enhanced> joinedStream = EntriesStream.join(historyStream)
.where(new KeySelector<GenericRecord, String>() {
@Override
public String getKey(GenericRecord value) throws Exception {
return value.get("la_id").toString();
}
}).equalTo(new KeySelector<GenericRecord, String>() {
@Override
public String getKey(GenericRecord value) throws Exception {
return value.get("id").toString();
}
}).window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
.apply(new JoinFunction<GenericRecord, GenericRecord, Enhanced>() {
@Override
public Enhanced join(GenericRecord first, GenericRecord second) throws Exception {
return new Enhanced(
Long.parseLong(first.get("c_at").toString()),
first.get("c_type").toString(),
first.get("id").toString(),
Integer.parseInt(first.get("d_cts").toString()),
Integer.parseInt(first.get("c_cts").toString()),
second.get("prov").toString(),
second.get("bb_S_T").toString(),
second.get("p_id").toString(),
second.get("s_ccurr").toString()
);
}
});
Things I have tried so far:
1. Increased task slots on the task manager
2. Used processing time instead of event timeSylvia Lin
02/16/2023, 1:07 AMNathanael England
02/16/2023, 2:03 AMlogging
module in pyflink? I know my code is being executed, but I don't see any of the logs from inside my process function. This is in a unit test, so I'm not sure if there's something configuring them to be hidden.Jonathan Weaver
02/16/2023, 3:39 AMNathanael England
02/16/2023, 4:16 AMopen
method to perform heavier initialization tasks on startup. When working with a broadcast process function, is there a way to get the broadcast state in there? I see methods available for getting other types of state but not broadcast.Emmanuel Leroy
02/16/2023, 4:45 AMfetch.max.bytes
, max.partition.fetch.bytes
, max.records
as well as receive.buffer.bytes
to much larger numbers than default, making little to no difference, so I feel like it is not a consumer config issue.
I’m just not sure what else it may be at this point.
Any pointed to how to troubleshoot this would be appreciated.
ThanksGuruguha Marur Sreenivasa
02/16/2023, 5:29 AMTony Wang
02/16/2023, 6:09 AMcreate view test4 as(select *, current_watermark(test1.order_time) as curr, if(test1.order_time > test2.order_time, test1.order_time, test2.order_time) as mt from test1 inner join test2 on test1.product = test2.product where test1.order_time BETWEEN test2.order_time - INTERVAL '1' SECOND and test2.order_time);
and the desc test4
showed that the view doesn't have a watermark defiined. Did I mess up somewhere?Tony Wang
02/16/2023, 6:16 AMreate view test4 as(select *, current_watermark(test1.order_time) as curr, if(test1.order_time > test2.order_time, test1.order_time, test2.order_time) as mt from test1 inner join test2 FOR SYSTEM_TIME AS OF test2.order_time on test1.product = test2.product);
Surkhay Surkhayli
02/16/2023, 7:10 AM[info] org.apache.flink.runtime.client.JobExecutionException: Job completed with illegal application status: UNKNOWN.
331s220[info] at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:148)
331s221[info] at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
331s222[info] at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
331s223[info] at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
331s224[info] at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
331s225[info] at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)
331s226[info] at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
331s227[info] at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
331s228[info] at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
331s229[info] at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
Amenreet Singh Sodhi
02/16/2023, 7:43 AMReme Ajayi
02/16/2023, 11:34 AMfinal StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Use event time from the source record
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// Use S3 as the backend store to store checkpoints for recovery
env.setStateBackend(new HashMapStateBackend());
// Perform checkpoint every minute
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointStorage(config.getCheckpointPath());
KafkaSource<GenericRecord> history_source = KafkaSource.<GenericRecord>builder()
.setBootstrapServers(BOOTSTRAP_SERVERS)
.setTopics("history_topic")
.setStartingOffsets(OffsetsInitializer.earliest())
.setGroupId("test-001")
.setProperty("security.protocol", "SASL_SSL")
.setProperty("sasl.mechanism", "PLAIN")
.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='" + CONFLUENT_API_KEY + "' password='" + CONFLUENT_SECRET + "';")
.setProperty("client.dns.lookup","use_all_dns_ips")
.setValueOnlyDeserializer(ConfluentRegistryAvroDeserializationSchema.forGeneric(historySchema, SCHEMA_REGISTRY_URL, SCHEMA_REGISTRY_CONFIG))
.build();
KafkaSource<GenericRecord> entries_source = KafkaSource.<GenericRecord>builder()
.setBootstrapServers(BOOTSTRAP_SERVERS)
.setTopics("entries_topic")
.setStartingOffsets(OffsetsInitializer.earliest())
.setGroupId("test-001")
.setProperty("security.protocol", "SASL_SSL")
.setProperty("sasl.mechanism", "PLAIN")
.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username='" + CONFLUENT_API_KEY + "' password='" + CONFLUENT_SECRET + "';")
.setProperty("client.dns.lookup","use_all_dns_ips")
.setValueOnlyDeserializer(ConfluentRegistryAvroDeserializationSchema.forGeneric(EntriesSchema, SCHEMA_REGISTRY_URL, SCHEMA_REGISTRY_CONFIG))
.build();
DataStream<GenericRecord> historyStream = env.fromSource(history_source, WatermarkStrategy.<GenericRecord>forMonotonousTimestamps().withTimestampAssigner((event, timestamp) -> {
return timestamp;
}), "History Source");
DataStream<GenericRecord> journalEntriesStream = env.fromSource(entries_source, WatermarkStrategy.<GenericRecord>forMonotonousTimestamps().withTimestampAssigner((event, timestamp) -> {
return timestamp;
}), "Entries Source");
DataStream<Enhanced> joinedStream = EntriesStream.join(historyStream)
.where(new KeySelector<GenericRecord, String>() {
@Override
public String getKey(GenericRecord value) throws Exception {
return value.get("la_id").toString();
}
}).equalTo(new KeySelector<GenericRecord, String>() {
@Override
public String getKey(GenericRecord value) throws Exception {
return value.get("id").toString();
}
}).window(TumblingEventTimeWindows.of(Time.seconds(30)))
.apply(new JoinFunction<GenericRecord, GenericRecord, Enhanced>() {
@Override
public Enhanced join(GenericRecord first, GenericRecord second) throws Exception {
return new Enhanced(
Long.parseLong(first.get("c_at").toString()),
first.get("c_type").toString(),
first.get("id").toString(),
Integer.parseInt(first.get("d_cts").toString()),
Integer.parseInt(first.get("c_cts").toString()),
second.get("prov").toString(),
second.get("bb_S_T").toString(),
second.get("p_id").toString(),
second.get("s_ccurr").toString()
);
}
});
joinedStream.print();
Adesh Dsilva
02/16/2023, 12:20 PMcurrentFetchEventTimeLag
and currentEmitEventTimeLag
I can see the metric currentEmitEventTimeLag
but not currentFetchEventTimeLag
in Flink UI. Any idea why?
What is the difference between the two? Will backpressure cause differences between them?Matthias
02/16/2023, 2:21 PMCREATE TABLE MyTable (
> a bigint,
> b int not null,
> c varchar,
> d timestamp(3)
> ) with ('connector' = 'datagen', 'rows-per-second' = '1', 'fields.a.kind' = 'sequence', 'fields.a.start' = '0', 'fields.a.end' = '1000000');
2. Then, I start a simple query on that table:
SELECT a FROM MyTable;
Which results in a constant flow of ever increasing integer values to be printed in the sql client.
3. The job shows up in the logs and the UI. I stop the job with a savepoint:
$ ./bin/flink stop --savepointPath ../1.16.1-savepoint <job-id>
This creates a savepoint and stops the job.
4. I want to restart the job from within the sql client:
SET 'execution.savepoint.path' = '<path-to-savepoint>';
SELECT a FROM MyTable;
---
My expectation would be that the sql client starts printing the incrementing number leaving the numbers out that were already printed in the run before the savepoint was created. But the sql client is not printing anything at all. With a more complex example, I actually see in the Flink UI that data is sent between the tasks. So, it's not that nothing is happening. But it seems like I'm missing something here 🤔Kellan Burket
02/16/2023, 2:35 PMorg.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Generic types have been disabled in the ExecutionConfig and type KeyedOutputEvent2 is treated as a generic type.
Seems like this should be possible based on the documentation.Tony Wang
02/16/2023, 4:38 PMAlexander Preuss
02/16/2023, 5:01 PMOAUTHBEARER
authentication.
I’m configuring the KafkaSource
like this:
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(bootstrapServers)
.setTopics(inputTopic)
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.setProperty("security.protocol", "SASL_SSL")
.setProperty("sasl.login.callback.handler.class", OauthLoginCallbackHandler.class.getName())
.setProperty("sasl.mechanism", "OAUTHBEARER")
.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required"
+ " oauth.issuer.url=\"%s\""
+ " oauth.credentials.url=\"%s\""
+ " oauth.audience=\"%s\";";)
.build();
When I submit my job I run into the following `NoClassDefFoundError`:
2023-02-16 15:54:06,180 ERROR org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Uncaught exception in the SplitEnumerator for Source Source: Kafka source while starting the SplitEnumerator.. Triggering job failover.
org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient
at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:546) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:478) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
at org.apache.kafka.clients.admin.Admin.create(Admin.java:133) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
at org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:39) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getKafkaAdminClient(KafkaSourceEnumerator.java:410) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.start(KafkaSourceEnumerator.java:151) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$start$1(SourceCoordinator.java:225) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$10(SourceCoordinator.java:449) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) [flink-dist-1.16.1.jar:1.16.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) [?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: org.apache.kafka.common.KafkaException: java.lang.NoClassDefFoundError: com/fasterxml/jackson/databind/ObjectMapper
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:184) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:517) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
... 14 more
Caused by: java.lang.NoClassDefFoundError: com/fasterxml/jackson/databind/ObjectMapper
at org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredJws.toMap(OAuthBearerUnsecuredJws.java:299) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
at org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredJws.<init>(OAuthBearerUnsecuredJws.java:83) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
at org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handleTokenCallback(OAuthBearerUnsecuredLoginCallbackHandler.java:211) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
at org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handle(OAuthBearerUnsecuredLoginCallbackHandler.java:163) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:316) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
at javax.security.auth.login.LoginContext.invoke(Unknown Source) ~[?:?]
at javax.security.auth.login.LoginContext$4.run(Unknown Source) ~[?:?]
at javax.security.auth.login.LoginContext$4.run(Unknown Source) ~[?:?]
at java.security.AccessController.doPrivileged(Native Method) ~[?:?]
at javax.security.auth.login.LoginContext.invokePriv(Unknown Source) ~[?:?]
at javax.security.auth.login.LoginContext.login(Unknown Source) ~[?:?]
at org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin.login(ExpiringCredentialRefreshingLogin.java:204) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
at org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerRefreshingLogin.login(OAuthBearerRefreshingLogin.java:150) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:62) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:105) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:170) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:517) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
... 14 more
Caused by: java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.ObjectMapper
at java.net.URLClassLoader.findClass(Unknown Source) ~[?:?]
at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51) ~[flink-dist-1.16.1.jar:1.16.1]
at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
at org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredJws.toMap(OAuthBearerUnsecuredJws.java:299) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
at org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredJws.<init>(OAuthBearerUnsecuredJws.java:83) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
at org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handleTokenCallback(OAuthBearerUnsecuredLoginCallbackHandler.java:211) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
at org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handle(OAuthBearerUnsecuredLoginCallbackHandler.java:163) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:316) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
at org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
at javax.security.auth.login.LoginContext.invoke(Unknown Source) ~[?:?]
at javax.security.auth.login.LoginContext$4.run(Unknown Source) ~[?:?]
at javax.security.auth.login.LoginContext$4.run(Unknown Source) ~[?:?]
at java.security.AccessController.doPrivileged(Native Method) ~[?:?]
at javax.security.auth.login.LoginContext.invokePriv(Unknown Source) ~[?:?]
at javax.security.auth.login.LoginContext.login(Unknown Source) ~[?:?]
at org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin.login(ExpiringCredentialRefreshingLogin.java:204) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
at org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerRefreshingLogin.login(OAuthBearerRefreshingLogin.java:150) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:62) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:105) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:170) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:517) ~[blob_p-e7cf6c9007e3786b6a409f1c61d4d65e35d634ce-99d0f3b0d7940f398cc7b5441915b1dd:?]
... 14 more
Any idea why this happens?