I’m doing a POC for Flink SQL. When reading from K...
# troubleshooting
t
I’m doing a POC for Flink SQL. When reading from Kafka, I created the topic connection with the Flink DDL:
Copy code
CREATE TABLE table_name ( ... ) WITH ( 'connector' = 'kafka' .... )
I’m reading from Confluent Kafka with security protocol:
Copy code
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="xxx" password="xxx";'
The API Keys are in the DDL. I’d like to avoid that, How can I create this table with DDL without specifying these secret configs in it? Ideally I could inject these secret configs from in-code variables (fetched from secure place).
e
Hey, I think you can specify this using pyFlink TableAPI with table descriptor: https://nightlies.apache.org/flink/flink-docs-master/api/python/reference/pyflink.table/descriptors.html We do it like this:
Copy code
TableDescriptor.for_connector('kafka')
            .schema(table_schema)
            .option('topic', kafka_topic_name)
            .option('properties.bootstrap.servers', bootstrap_server)
						#other options 
            .build()
        )
👍 1
t
ah yeah, cool! Using Scala
Copy code
TableDescriptor.forConnector("kafka")
      .schema(schema)
      .option('topic", ..)
Thanks!