George Leonard
07/20/2024, 7:29 PM###############################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################
###############################################################################
#SQL CLI - inspired by <https://github.com/wuchong/flink-sql-demo/tree/v1.11-EN/sql-client>
###############################################################################
#FROM flink:1.18.1-scala_2.12-java11
FROM arm64v8/flink:1.18.1-scala_2.12-java11
# Create CLI lib folder
COPY sql-client/bin/* /opt/sql-client/
RUN mkdir -p /opt/sql-client/lib
# Download connector libraries
RUN wget -P /opt/sql-client/lib/ <https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/3.0.1-1.17/flink-sql-connector-elasticsearch7-3.0.1-1.17.jar>; \
wget -P /opt/sql-client/lib/ <https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.2.0-1.18/flink-sql-connector-kafka-3.2.0-1.18.jar>; \
wget -P /opt/sql-client/lib/ <https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-avro-confluent-registry/1.18.1/flink-sql-avro-confluent-registry-1.18.1.jar>; \
wget -P /opt/sql-client/lib/ <https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-avro/1.18.1/flink-sql-avro-1.18.1.jar>; \
wget -P /opt/sql-client/lib/ <https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc/3.2.0-1.18/flink-connector-jdbc-3.2.0-1.18.jar>; \
wget -P /opt/sql-client/lib/ <https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-kafka/3.2.0-1.18/flink-connector-kafka-3.2.0-1.18.jar>; \
wget -P /opt/sql-client/lib/ <https://repo.maven.apache.org/maven2/org/apache/flink/flink-json/1.18.1/flink-json-1.18.1.jar>; \
wget -P /opt/sql-client/lib/ <https://github.com/knaufk/flink-faker/releases/download/v0.5.3/flink-faker-0.5.3.jar>; \
wget -P /opt/sql-client/lib/ <https://jdbc.postgresql.org/download/postgresql-42.5.4.jar>;
# Copy configuration
COPY sql-client/conf/* /opt/flink/conf/
WORKDIR /opt/sql-client
ENV SQL_CLIENT_HOME /opt/sql-client
USER root
COPY sql-client/docker-entrypoint.sh /
RUN chmod +x /docker-entrypoint.sh
ENTRYPOINT ["/docker-entrypoint.sh"]
Flink sql
CREATE TABLE avro_salesbaskets_x (
`invoiceNumber` STRING,
`saleDateTime_Ltz` STRING,
`saleTimestamp_Epoc` STRING,
`terminalPoint` STRING,
`nett` DOUBLE,
`vat` DOUBLE,
`total` DOUBLE,
`store` row<`id` STRING, `name` STRING>,
`clerk` row<`id` STRING, `name` STRING, `surname` STRING>,
`basketItems` array<row<`id` STRING, `name` STRING, `brand` STRING, `category` STRING, `price` DOUBLE, `quantity` INT>>,
`saleTimestamp_WM` AS TO_TIMESTAMP(FROM_UNIXTIME(CAST(`saleTimestamp_Epoc` AS BIGINT) / 1000)),
WATERMARK FOR `saleTimestamp_WM` AS `saleTimestamp_WM`
) WITH (
'connector' = 'kafka',
'topic' = 'avro_salesbaskets',
'properties.bootstrap.servers' = 'broker:29092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'avro-confluent',
'value.avro-confluent.schema-registry.url' = '<http://schema-registry:8081>',
'value.fields-include' = 'ALL'
);
-- Create flink tables and the attempt to insert, which is failing on the connect = kafka
CREATE TABLE avro_salesbaskets_x (
`invoiceNumber` STRING,
`saleDateTime_Ltz` STRING,
`saleTimestamp_Epoc` STRING,
`terminalPoint` STRING,
`nett` DOUBLE,
`vat` DOUBLE,
`total` DOUBLE,
`store` row<`id` STRING, `name` STRING>,
`clerk` row<`id` STRING, `name` STRING, `surname` STRING>,
`basketItems` array<row<`id` STRING, `name` STRING, `brand` STRING, `category` STRING, `price` DOUBLE, `quantity` INT>>,
`saleTimestamp_WM` AS TO_TIMESTAMP(FROM_UNIXTIME(CAST(`saleTimestamp_Epoc` AS BIGINT) / 1000)),
WATERMARK FOR `saleTimestamp_WM` AS `saleTimestamp_WM`
) WITH (
'connector' = 'kafka',
'topic' = 'avro_salesbaskets',
'properties.bootstrap.servers' = 'broker:29092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'avro-confluent',
'value.avro-confluent.schema-registry.url' = '<http://schema-registry:8081>',
'value.fields-include' = 'ALL'
);
-- pull (INPUT) the avro_salespayments topic into Flink (avro_salespayments_x)
CREATE TABLE avro_salespayments_x (
`invoiceNumber` STRING,
`payDateTime_Ltz` STRING,
`payTimestamp_Epoc` STRING,
`paid` DOUBLE,
`finTransactionId` STRING,
`payTimestamp_WM` AS TO_TIMESTAMP(FROM_UNIXTIME(CAST(`payTimestamp_Epoc` AS BIGINT) / 1000)),
WATERMARK FOR `payTimestamp_WM` AS `payTimestamp_WM`
) WITH (
'connector' = 'kafka',
'topic' = 'avro_salespayments',
'properties.bootstrap.servers' = 'broker:29092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = '<http://schema-registry:8081>',
'value.avro-confluent.properties.use.latest.version' = 'true',
'value.fields-include' = 'ALL'
);
-- Our avro_salescompleted_x (OUTPUT) table which will push values to the CP Kafka topic.
-- <https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/avro-confluent/>
CREATE TABLE avro_salescompleted_x (
`invoiceNumber` STRING,
`saleDateTime_Ltz` STRING,
`saleTimestamp_Epoc` STRING,
`terminalPoint` STRING,
`nett` DOUBLE,
`vat` DOUBLE,
`total` DOUBLE,
`store` row<`id` STRING, `name` STRING>,
`clerk` row<`id` STRING, `name` STRING, `surname` STRING>,
`basketItems` array<row<`id` STRING, `name` STRING, `brand` STRING, `category` STRING, `price` DOUBLE, `quantity` INT>>,
`payDateTime_Ltz` STRING,
`payTimestamp_Epoc` STRING,
`paid` DOUBLE,
`finTransactionId` STRING,
`payTimestamp_WM` AS TO_TIMESTAMP(FROM_UNIXTIME(CAST(`payTimestamp_Epoc` AS BIGINT) / 1000)),
`saleTimestamp_WM` AS TO_TIMESTAMP(FROM_UNIXTIME(CAST(`saleTimestamp_Epoc` AS BIGINT) / 1000)),
WATERMARK FOR `saleTimestamp_WM` AS `saleTimestamp_WM`
) WITH (
'connector' = 'kafka',
'topic' = 'avro_salescompleted_x',
'properties.bootstrap.servers' = 'broker:29092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'avro-confluent',
'value.avro-confluent.schema-registry.url' = '<http://schema-registry:8081>',
'value.fields-include' = 'ALL'
);
-- the fields in the select is case sensitive, needs to match theprevious create tables which match the definitions in the struct/avro sections.
Insert into avro_salescompleted_x
select
b.invoiceNumber,
b.saleDateTime_Ltz,
b.saleTimestamp_Epoc,
b.terminalPoint,
b.nett,
b.vat,
b.total,
b.store,
b.clerk,
b.basketItems,
a.payDateTime_Ltz,
a.payTimestamp_Epoc,
a.paid,
a.finTransactionId
FROM
avro_salespayments_x a,
avro_salesbaskets_x b
WHERE a.invoiceNumber = b.invoiceNumber
AND a.payTimestamp_WM > b.saleTimestamp_WM
AND b.saleTimestamp_WM > (b.saleTimestamp_WM - INTERVAL '1' HOUR);
-- Error
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
Available factory identifiers are:
blackhole
datagen
filesystem
print
python-input-format