Flink sql client Dockerfile => container ```###...
# troubleshooting
g
Flink sql client Dockerfile => container
Copy code
###############################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      <http://www.apache.org/licenses/LICENSE-2.0>
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################


###############################################################################
#SQL CLI - inspired by <https://github.com/wuchong/flink-sql-demo/tree/v1.11-EN/sql-client>
###############################################################################

#FROM flink:1.18.1-scala_2.12-java11
FROM arm64v8/flink:1.18.1-scala_2.12-java11

# Create CLI lib folder
COPY sql-client/bin/* /opt/sql-client/
RUN mkdir -p /opt/sql-client/lib

# Download connector libraries
RUN wget -P /opt/sql-client/lib/ <https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/3.0.1-1.17/flink-sql-connector-elasticsearch7-3.0.1-1.17.jar>; \
    wget -P /opt/sql-client/lib/ <https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.2.0-1.18/flink-sql-connector-kafka-3.2.0-1.18.jar>; \
    wget -P /opt/sql-client/lib/ <https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-avro-confluent-registry/1.18.1/flink-sql-avro-confluent-registry-1.18.1.jar>; \
    wget -P /opt/sql-client/lib/ <https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-avro/1.18.1/flink-sql-avro-1.18.1.jar>; \
    wget -P /opt/sql-client/lib/ <https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc/3.2.0-1.18/flink-connector-jdbc-3.2.0-1.18.jar>; \
    wget -P /opt/sql-client/lib/ <https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-kafka/3.2.0-1.18/flink-connector-kafka-3.2.0-1.18.jar>; \
    wget -P /opt/sql-client/lib/ <https://repo.maven.apache.org/maven2/org/apache/flink/flink-json/1.18.1/flink-json-1.18.1.jar>; \
    wget -P /opt/sql-client/lib/ <https://github.com/knaufk/flink-faker/releases/download/v0.5.3/flink-faker-0.5.3.jar>; \
    wget -P /opt/sql-client/lib/ <https://jdbc.postgresql.org/download/postgresql-42.5.4.jar>;

 
# Copy configuration
COPY sql-client/conf/* /opt/flink/conf/

WORKDIR /opt/sql-client
ENV SQL_CLIENT_HOME /opt/sql-client

USER root
COPY sql-client/docker-entrypoint.sh /
RUN chmod +x /docker-entrypoint.sh
ENTRYPOINT ["/docker-entrypoint.sh"]
Flink sql
Copy code
CREATE TABLE avro_salesbaskets_x (
    `invoiceNumber` STRING,
    `saleDateTime_Ltz` STRING,
    `saleTimestamp_Epoc` STRING,
    `terminalPoint` STRING,
    `nett` DOUBLE,
    `vat` DOUBLE,
    `total` DOUBLE,
    `store` row<`id` STRING, `name` STRING>,
    `clerk` row<`id` STRING, `name` STRING, `surname` STRING>,
    `basketItems` array<row<`id` STRING, `name` STRING, `brand` STRING, `category` STRING, `price` DOUBLE, `quantity` INT>>,
    `saleTimestamp_WM` AS TO_TIMESTAMP(FROM_UNIXTIME(CAST(`saleTimestamp_Epoc` AS BIGINT) / 1000)),
    WATERMARK FOR `saleTimestamp_WM` AS `saleTimestamp_WM`
) WITH (
    'connector' = 'kafka',
    'topic' = 'avro_salesbaskets',
    'properties.bootstrap.servers' = 'broker:29092',
    'properties.group.id' = 'testGroup',
    'scan.startup.mode' = 'earliest-offset',
    'value.format' = 'avro-confluent',
    'value.avro-confluent.schema-registry.url' = '<http://schema-registry:8081>',
    'value.fields-include' = 'ALL'
);
-- Create flink tables and the attempt to insert, which is failing on the connect = kafka
Copy code
CREATE TABLE avro_salesbaskets_x (
    `invoiceNumber` STRING,
    `saleDateTime_Ltz` STRING,
    `saleTimestamp_Epoc` STRING,
    `terminalPoint` STRING,
    `nett` DOUBLE,
    `vat` DOUBLE,
    `total` DOUBLE,
    `store` row<`id` STRING, `name` STRING>,
    `clerk` row<`id` STRING, `name` STRING, `surname` STRING>,
    `basketItems` array<row<`id` STRING, `name` STRING, `brand` STRING, `category` STRING, `price` DOUBLE, `quantity` INT>>,
    `saleTimestamp_WM` AS TO_TIMESTAMP(FROM_UNIXTIME(CAST(`saleTimestamp_Epoc` AS BIGINT) / 1000)),
    WATERMARK FOR `saleTimestamp_WM` AS `saleTimestamp_WM`
) WITH (
    'connector' = 'kafka',
    'topic' = 'avro_salesbaskets',
    'properties.bootstrap.servers' = 'broker:29092',
    'properties.group.id' = 'testGroup',
    'scan.startup.mode' = 'earliest-offset',
    'value.format' = 'avro-confluent',
    'value.avro-confluent.schema-registry.url' = '<http://schema-registry:8081>',
    'value.fields-include' = 'ALL'
);

-- pull (INPUT) the avro_salespayments topic into Flink (avro_salespayments_x)
CREATE TABLE avro_salespayments_x (
    `invoiceNumber` STRING,
    `payDateTime_Ltz` STRING,
    `payTimestamp_Epoc` STRING,
    `paid` DOUBLE,
    `finTransactionId` STRING,
    `payTimestamp_WM` AS TO_TIMESTAMP(FROM_UNIXTIME(CAST(`payTimestamp_Epoc` AS BIGINT) / 1000)),
    WATERMARK FOR `payTimestamp_WM` AS `payTimestamp_WM`
) WITH (
    'connector' = 'kafka',
    'topic' = 'avro_salespayments',
    'properties.bootstrap.servers' = 'broker:29092',
    'properties.group.id' = 'testGroup',
    'scan.startup.mode' = 'earliest-offset',
    'value.format' = 'avro-confluent',
    'value.avro-confluent.url' = '<http://schema-registry:8081>', 
    'value.avro-confluent.properties.use.latest.version' = 'true',
    'value.fields-include' = 'ALL'
);


-- Our avro_salescompleted_x (OUTPUT) table which will push values to the CP Kafka topic.
-- <https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/avro-confluent/>
CREATE TABLE avro_salescompleted_x (
    `invoiceNumber` STRING,
    `saleDateTime_Ltz` STRING,
    `saleTimestamp_Epoc` STRING,
    `terminalPoint` STRING,
    `nett` DOUBLE,
    `vat` DOUBLE,
    `total` DOUBLE,
    `store` row<`id` STRING, `name` STRING>,
    `clerk` row<`id` STRING, `name` STRING, `surname` STRING>,
    `basketItems` array<row<`id` STRING, `name` STRING, `brand` STRING, `category` STRING, `price` DOUBLE, `quantity` INT>>,     
    `payDateTime_Ltz` STRING,
    `payTimestamp_Epoc` STRING,
    `paid` DOUBLE,
    `finTransactionId` STRING,
    `payTimestamp_WM` AS TO_TIMESTAMP(FROM_UNIXTIME(CAST(`payTimestamp_Epoc` AS BIGINT) / 1000)),
    `saleTimestamp_WM` AS TO_TIMESTAMP(FROM_UNIXTIME(CAST(`saleTimestamp_Epoc` AS BIGINT) / 1000)),
    WATERMARK FOR `saleTimestamp_WM` AS `saleTimestamp_WM`
) WITH (
    'connector' = 'kafka',
    'topic' = 'avro_salescompleted_x',
    'properties.bootstrap.servers' = 'broker:29092',
    'properties.group.id' = 'testGroup',
    'scan.startup.mode' = 'earliest-offset',
    'value.format' = 'avro-confluent',
    'value.avro-confluent.schema-registry.url' = '<http://schema-registry:8081>',
    'value.fields-include' = 'ALL'
);

-- the fields in the select is case sensitive, needs to match theprevious create tables which match the definitions in the struct/avro sections.
Insert into avro_salescompleted_x
select
        b.invoiceNumber,
        b.saleDateTime_Ltz,
        b.saleTimestamp_Epoc,
        b.terminalPoint,
        b.nett,
        b.vat,
        b.total,
        b.store,
        b.clerk,    
        b.basketItems,        
        a.payDateTime_Ltz,
        a.payTimestamp_Epoc,
        a.paid,
        a.finTransactionId
    FROM 
        avro_salespayments_x a,
        avro_salesbaskets_x b
    WHERE a.invoiceNumber = b.invoiceNumber
    AND a.payTimestamp_WM > b.saleTimestamp_WM 
    AND b.saleTimestamp_WM > (b.saleTimestamp_WM - INTERVAL '1' HOUR);
-- Error
Copy code
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

Available factory identifiers are:

blackhole
datagen
filesystem
print
python-input-format