Flink SQL> CREATE TABLE KafkaTable ( > BU...
# troubleshooting
s
Flink SQL> CREATE TABLE KafkaTable (
BU_ORGANIZATION STRING,
BUS_NAME STRING,
EMPLOYEE_ID STRING,
EMP_NAME STRING,
EVENT_NAME STRING,
UPDATE_DATE_TIME BIGINT,
ID BIGINT,
DOC_NO STRING,
EXPIRY_DATE BIGINT,
ts TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'test',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'avro'
);
[INFO] Execute statement succeed. Flink SQL> select * from KafkaTable; [ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: org.apache.avro.LogicalType Flink SQL> exit; As above when we try to create table which and fetch data from kafka and the issues come "*java.lang.ClassNotFoundException: org.apache.avro.LogicalTyp*e" , we already copies the flink-avro-1.19.1.jar to lib folder in apache Flink home . Apache Flink verion 1.19.1 Java version openjdk version "17.0.12"
d
Did you use maven? also is it “fat” jar deployment?
probably not related but I think JDK 11 is probably the most stable still for 1.19.1 afaik
You need to setup the right dependencies
Copy code
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-avro_${scala.binary.version}</artifactId>
        <version>1.19.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.avro</groupId>
     <artifactId>avro</artifactId>
        <version>1.8.2</version> <!-- Or the version compatible with Flink 1.19.1 -->
    </dependency>
</dependencies>
Check which version of avro is suitable for Flink 1.19.1
Copy code
mvn dependency tree
might give you information on whether there is a clash with avro versions. These can come from other dependencies or shaded jars etc.
if you inspect the jar file of dependency and see that it has
Copy code
org.apache.avro.LogicalType
inside than there are just two possibilities. It’s not on the classpath or there is a conflict of versions.
s
Ok, Thank you....let me try I will update you
d
Sure. I think for building “fat” jar if you go that route you can use plugin like https://maven.apache.org/plugins/maven-shade-plugin/index.html
Its the easiest way to make sure the jars are there and allows exclusions if you need them as well based on dependency tree.
r
@Sameer alwosaby Have you ever succeeded in connecting to kafka? I currently have the same error and don't get it working, not even able to connect to a schemaless topic. I guess that is a problem with the classpath...
d
@René how are you building your jars? are you creating a “fat” jar? also what do you see for mvn dependency tree?
r
I build an image with these jars in it:
Copy code
FROM flink:1.17.2

# Add additional connectors and libs not included in the default Flink image

add ./flink_1.17.2/flink-clients-1.17.2.jar /opt/flink/lib
add ./flink_1.17.2/flink-avro-confluent-registry-1.17.2.jar /opt/flink/lib
add ./flink_1.17.2/flink-sql-avro-confluent-registry-1.17.2.jar /opt/flink/lib
add ./flink_1.17.2/flink-connector-kafka-3.1.0-1.17.jar /opt/flink/lib
add ./flink_1.17.2/flink-sql-connector-kafka-3.1.0-1.17.jar /opt/flink/lib
add ./flink_1.17.2/flink-connector-jdbc-3.1.0-1.17.jar /opt/flink/lib
add ojdbc8-19.21.0.0.jar /opt/flink/lib
Do I miss anything?
I don't build any jars on my own.
d
I am not that familiar with avro can you please run
Copy code
jar tf flink-sql-avro-confluent-registry-1.17.2.jar | grep LogicalType.class
It’s always good to inspect the jar to make sure it’s really there.
r
jar command isn't available in the container. I've also tried to change the owner of the added jar files from root to flink or to the user 1001, but that didn't change anything
The cause is not the avro lib, I even don't manage to read a kafka topic without any schema. In this case I get:
java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.OffsetResetStrategy
It seems that the classes cannot be loaded ...
d
you are using Java 11?
r
Yes
May it be related to the security context of the pod running on OpenShift?
After further analysis it seems that the jars are loaded correctly into the pod but we miss some more dependent jars resp. classes. Can anyone tell which jars we need in the pod to be able to connect to kafka?
d
do be certain you could enter the pod using kubectl and list the files in the directory if you have that access.
if you are in Flink 1.17.x you will likely need flink-connector-base.jar and flink-connector.jar with corresponding versions.
and that’s in addition to the Kafka client libraries themselves
In the end producing one “fat” jar using the maven plugin might be the easiest way to manage dependencies or just to run mvn dependency tree it’s worth it maybe for that to check for dependency issues.
You can always package up the fat jar just as you are loading all the jars now. I think maven can easily be installed and used for both the dependency checks and the fat jar packaging.
r
Thanks for your advice. I'll try to put the missing flink-connector-base.jar into the container. The kafka connector and client libs are already in it.
I am progressing slowly: besides the above mentioned jars I had to add the kafka-client-jar. So far so good. Now I'm confronted with this error:
Copy code
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.ConsumerRecord
Should actually be in the kafka-client-jar, but somehow it isn't found. Tried it with version 3.1 and 3.7 (which we currently have at kafka), but no success. Any Idea?
I finally got it working 😅 Reading from kafka topics without schema works, but when I use avro-confluent format then I only get NULL values back. What could be the cause for that?
solved it as well 👍
s
Thank you for All,Currently i use json format instead of avro format ,it is working properly. Before I was used Avro-Confluent format.
d
Switching from Avro often brings about good things
With Parquet, Json and so many other formats available I dont see myself going back to Avro