Hei, we are trying to use flink-protobuf support w...
# troubleshooting
o
Hei, we are trying to use flink-protobuf support with the kafka connector. When trying to define it in the code like this:
Copy code
CREATE TEMPORARY TABLE users_tnc
(
    userId STRING,
    country STRING,
    acceptedTime TIMESTAMP(3),
    eventMetadata row(eventId STRING, eventTime TIMESTAMP(3)),
    PRIMARY KEY (userId) NOT ENFORCED
) WITH (
  'connector' = 'kafka',
  'topic' = 'usertnc.v1beta1',
  'properties.bootstrap.servers' = 'localhost:9094',
  'properties.group.id' = 'ethanol-bmp',
  'key.format' = 'raw',
  'value.format' = 'raw',
  'format' = 'protobuf',
  'protobuf.message-class-name' = 'com.test.UserTnCChangedEvent',
  'protobuf.ignore-parse-errors' = 'true'
);
we are facing with the following exception: Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'protobuf' that implements 'org.apache.flink.table.factories.DeserializationFormatFactory' in the classpath. We are using flink 1.16 and made sure that the flink protobuf library is in the classpath (packaged as part of the uber jar) thanks!
s
Do you have
flink-protobuf
in your dependency? See https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/formats/protobuf/
Copy code
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-protobuf</artifactId>
  <version>1.16.1</version>
</dependency>
o
yes, we have it and it is packed as an uber jar. Is that enough or we should include it as external library and have it in /opt/flink/lib folder?
s
hmm you should not need to put it in the /opt/flink/lib if it’s already in the uber jar. Have you checked https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/configuration/maven/#packaging-the-application or the one for gradle depending on which build tool you use
h
I got the same error. Have you solved it?
o
we managed to get over it by just adding the library in /opt/flink/lib rather than packaging in a uber jar but now we are getting a different error 🙂
h
By adding the
flink-protobuf
to flink/lib?
Now I faced with the other error:
Copy code
Caused by: java.lang.ClassCastException: class com.google.protobuf.Descriptors$Descriptor cannot be cast to class com.google.protobuf.Descriptors$Descriptor (com.google.protobuf.Descriptors$Descriptor is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @3fc9504b; com.google.protobuf.Descriptors$Descriptor is in unnamed module of loader 'app')
s
Depends on how you set up your flink cluster, you need to put the application jar in the right place. In general you should avoid dynamic classloading which is a common cause for
X cannot be cast to X exceptions
, and put the uber jar directly in the flink/lib. See https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/debugging/debugging_classloading/#avoiding-dynamic-classloading-for-user-code
h
Very curious that when I set the ``classloader.resolve-order: parent-first` Sometimes the jar worked well with no exception, sometimes not. Look at the following code block, I try to run the jar multiple times, exception ocurrs time to time.
Copy code
Caused by: org.apache.flink.formats.protobuf.PbCodegenException: java.lang.IllegalArgumentException: get javaProto.autox.metric_event.MetricEventTest descriptors error!
        at org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:124)
        at org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.open(PbRowDataDeserializationSchema.java:64)
        at org.apache.flink.connector.file.table.DeserializationSchemaAdapter.createDeserialization(DeserializationSchemaAdapter.java:64)
        ... 17 more
Caused by: java.lang.IllegalArgumentException: get javaProto.autox.metric_event.MetricEventTest descriptors error!
        at org.apache.flink.formats.protobuf.util.PbFormatUtils.getDescriptor(PbFormatUtils.java:126)
        at org.apache.flink.formats.protobuf.deserialize.ProtoToRowConverter.<init>(ProtoToRowConverter.java:62)
        ... 19 more
Caused by: java.lang.ClassCastException: class com.google.protobuf.Descriptors$Descriptor cannot be cast to class com.google.protobuf.Descriptors$Descriptor (com.google.protobuf.Descriptors$Descriptor is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @7604fe85; com.google.protobuf.Descriptors$Descriptor is in unnamed module of loader 'app')
        at org.apache.flink.formats.protobuf.util.PbFormatUtils.getDescriptor(PbFormatUtils.java:123)
        ... 20 more
~/ $ ~/Downloads/flink-1.17.0/bin/flink run target/import-1.0-SNAPSHOT.jar --input_file test.txt
Job has been submitted with JobID bdbbcc8342f7bdc8a017bc325f83ab81
Empty set