Tal Sheldon
09/19/2023, 12:59 PMval settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build()
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
val tableEnv = StreamTableEnvironment.create(env, settings)
val ddl =
"""CREATE TABLE invalid_events
(
...
) WITH (
'connector' = 'kafka',
'topic' = '...',
'properties.bootstrap.servers' = '..',
'properties.group.id' = 'testGroup11',
'value.format' = 'json',
'scan.startup.mode' = 'latest-offset',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"...\" password=\"...\";'
);"""
tableEnv.executeSql(ddl)
//
val result = tableEnv.sqlQuery(""" SELECT * FROM invalid_events""")
.toDataStream
.print()
env.execute("my job")
I keep getting:
Caused by: org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient
....
Caused by: java.lang.IllegalArgumentException: Value not specified for key 'username' in JAAS config
But I checked the format a few times already - it should be fine. The username exists.
Any idea what is wrong?