I’m trying to write a very simple Flink app that r...
# troubleshooting
t
I’m trying to write a very simple Flink app that reads from Confluent Kafka topic, and prints it with Flink SQL using a local Flink cluster.
Copy code
val settings = EnvironmentSettings
      .newInstance()
      .inStreamingMode()
      .build()

    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
    val tableEnv = StreamTableEnvironment.create(env, settings)

    val ddl =
      """CREATE TABLE invalid_events
(
...
) WITH (
      'connector' = 'kafka',
      'topic' = '...',
      'properties.bootstrap.servers' = '..',
      'properties.group.id' = 'testGroup11',
      'value.format' = 'json',
      'scan.startup.mode' = 'latest-offset',
      'properties.security.protocol' = 'SASL_PLAINTEXT',
      'properties.sasl.mechanism' = 'PLAIN',
      'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"...\" password=\"...\";'
      );"""
    
    tableEnv.executeSql(ddl)
    //
    val result = tableEnv.sqlQuery(""" SELECT * FROM invalid_events""")
      .toDataStream
      .print()
    

    env.execute("my job")
I keep getting:
Copy code
Caused by: org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient
....
Caused by: java.lang.IllegalArgumentException: Value not specified for key 'username' in JAAS config
But I checked the format a few times already - it should be fine. The username exists. Any idea what is wrong?