Hi here, I am using a Flink Notebook in AWS (with ...
# troubleshooting
f
Hi here, I am using a Flink Notebook in AWS (with managed Flink). My target is to read a json topic and write it back in an avro topic (I hope this is a valid use case for Flink). I used the AWS template and I was able to read a JSON topic, but now I am trying to read an avro one. I am doing everything using Flink SQL. My code:
Copy code
%flink.ssql(type=update)
DROP TABLE IF EXISTS stock_table;
CREATE TABLE stock_table (
  event_id STRING,
  event_created_ts TIMESTAMP(3),
  app_name string,
  app_version string,
  gpu_id string,
  context_id string,
  tenant_id string,
  page_id string,
  element_id string,
  WATERMARK for event_created_ts as event_created_ts - INTERVAL '15' SECONDS
) WITH (
  'connector' = 'kafka',
  'topic' = 'dev.avro.click',
  'properties.bootstrap.servers' = '<http://xxxxxxxxxxxxxxxxxxxxxxxc3.kafka.eu-central-1.amazonaws.com|xxxxxxxxxxxxxxxxxxxxxxxc3.kafka.eu-central-1.amazonaws.com>',
  'properties.security.protocol' = 'SASL_SSL',
  'properties.sasl.mechanism' = 'AWS_MSK_IAM',
  'properties.sasl.jaas.config' = 'software.amazon.msk.auth.iam.IAMLoginModule required;',
  'properties.sasl.client.callback.handler.class' = 'software.amazon.msk.auth.iam.IAMClientCallbackHandler',
  'properties.group.id' = 'myGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'avro-confluent'
);


SELECT * FROM stock_table;
But I get this error:
Copy code
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'avro-confluent' that implements 'org.apache.flink.table.factories.DeserializationFormatFactory' in the classpath.

Available factory identifiers are:

canal-json
csv
debezium-json
json
maxwell-json
ogg-json
raw
I feel like I am missing something
j
f
then my next question is: how do I add it to the AWS Flink Studio?
j
under configuration tab on MSF studio:
f
damn, thanks a lot @Jeremy Ber πŸ™‚
flink 1
it worked, but I am not able to see anything from that topic. I think the schema is correct. The table is dropped, then recreate. But the select returns no data
and then it shows:
Copy code
TimeoutException: Timed out waiting for a node assignment. Call: describeTopics
j
Hi Francesco, could this be a networking issue?
f
like, I am not able to reach the MSK cluster, I will check with our infra colleague