Francesco Mucio
09/20/2023, 5:09 PM%flink.ssql(type=update)
DROP TABLE IF EXISTS stock_table;
CREATE TABLE stock_table (
event_id STRING,
event_created_ts TIMESTAMP(3),
app_name string,
app_version string,
gpu_id string,
context_id string,
tenant_id string,
page_id string,
element_id string,
WATERMARK for event_created_ts as event_created_ts - INTERVAL '15' SECONDS
) WITH (
'connector' = 'kafka',
'topic' = 'dev.avro.click',
'properties.bootstrap.servers' = '<http://xxxxxxxxxxxxxxxxxxxxxxxc3.kafka.eu-central-1.amazonaws.com|xxxxxxxxxxxxxxxxxxxxxxxc3.kafka.eu-central-1.amazonaws.com>',
'properties.security.protocol' = 'SASL_SSL',
'properties.sasl.mechanism' = 'AWS_MSK_IAM',
'properties.sasl.jaas.config' = 'software.amazon.msk.auth.iam.IAMLoginModule required;',
'properties.sasl.client.callback.handler.class' = 'software.amazon.msk.auth.iam.IAMClientCallbackHandler',
'properties.group.id' = 'myGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'avro-confluent'
);
SELECT * FROM stock_table;
But I get this error:
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'avro-confluent' that implements 'org.apache.flink.table.factories.DeserializationFormatFactory' in the classpath.
Available factory identifiers are:
canal-json
csv
debezium-json
json
maxwell-json
ogg-json
raw
I feel like I am missing somethingJeremy Ber
09/20/2023, 5:46 PMFrancesco Mucio
09/20/2023, 5:56 PMJeremy Ber
09/20/2023, 5:57 PMFrancesco Mucio
09/20/2023, 5:57 PMFrancesco Mucio
09/21/2023, 10:56 AMFrancesco Mucio
09/21/2023, 11:14 AMTimeoutException: Timed out waiting for a node assignment. Call: describeTopics
Jeremy Ber
09/21/2023, 1:15 PMFrancesco Mucio
09/21/2023, 1:26 PM