Hi Team, happy Friday! We are trying to run simple...
# troubleshooting
s
Hi Team, happy Friday! We are trying to run simple queries on a Confluent Kafka topic. We are able to successfully connect to the topic and run
SELECT COUNT(*)
queries. But, when we try to run
SELECT *
queries we get records with all
null
fields. The Kafka topic uses Confluent Schema registry to hold the schemas. I was wondering if the fields are not being deserialized properly. Any help would be greatly appreciated! Query:
Copy code
CREATE TABLE account
(
    `ACCT_COND_ID`      INT,
    `ACCT_ID`           INT,
    `ACCT_COND_NM`      STRING,
    `ACCT_COND_STRT_TS` TIMESTAMP(3),
    `ACCT_COND_STOP_TS` TIMESTAMP(3),
    `SRC_ACCT_ID`       STRING,
    `UPDATE_TS`         TIMESTAMP(3),
    `SRC_ACCT_ID2`      STRING,
    `SRC_SYS_CD`        STRING,
    `SRC_ACCT_SYS_CD`   STRING
) WITH (
      'connector' = 'kafka',
      'topic' = '**********',
      'properties.bootstrap.servers' = '**********',
      'key.format' = 'avro-confluent',
      'key.fields' = 'ACCT_COND_ID',
      'key.avro-confluent.url' = 'https://**********',
      'key.avro-confluent.basic-auth.credentials-source' = 'USER_INFO',
      'key.avro-confluent.basic-auth.user-info' = '**********',
      'value.format' = 'avro-confluent',
      'value.avro-confluent.url' = 'https://**********',
      'value.avro-confluent.basic-auth.credentials-source' = 'USER_INFO',
      'value.avro-confluent.basic-auth.user-info' = '**********',
      'scan.startup.mode' = 'earliest-offset',
      'properties.security.protocol' = 'SASL_SSL',
      'properties.sasl.jaas.config' = '**********'
      );

SELECT *
FROM account
LIMIT 10;
Output: See attached pic
βœ… 1
s
This is very interesting πŸ™‚ What Flink version do you use? Also, could you share the Avro schema for this topic?
s
I am using Flink 1.15.2
Your question made me remember that the schema is using
ROW
datatype. Changing the Query to this fix it for me
Copy code
CREATE TABLE account
(
    accountconditiondata ROW(ACCT_COND_ID INT,
        ACCT_ID INT,
        ACCT_COND_NM STRING,
        ACCT_COND_STRT_TS TIMESTAMP (3),
        ACCT_COND_STOP_TS TIMESTAMP (3),
        SRC_ACCT_ID STRING,
        UPDATE_TS TIMESTAMP (3),
        SRC_ACCT_ID2 STRING,
        SRC_SYS_CD STRING,
        SRC_ACCT_SYS_CD STRING)
) WITH (
      'connector' = 'kafka',
      'topic' = '**********',
      'properties.bootstrap.servers' = '**********',
      'value.format' = 'avro-confluent',
      'value.avro-confluent.url' = 'https://**********',
      'value.avro-confluent.basic-auth.credentials-source' = 'USER_INFO',
      'value.avro-confluent.basic-auth.user-info' = '**********',
      'scan.startup.mode' = 'earliest-offset',
      'properties.security.protocol' = 'SASL_SSL',
      'properties.sasl.jaas.config' = '**********'
      );
πŸ™Œ 1