Also, we were wondering if there is any other way ...
# troubleshooting
o
Also, we were wondering if there is any other way of creating a table using upsert-kafka other than using CREATE table statement and reading using the connector upsert-kafka. For normal kafka topics, we can create the stream from the kafka with env.fromSource and then do tableEnv.fromDataStream(). Is there an equivalent of creating an instance of table without having to use the connector and via API ? Thanks!
m
Upsert Kafka is an implementation for the Table API/SQL, while the other things you are mentioning are DataStream API. You mentioned going from DataStream to Table API, but you can also go from Table API to DataStream API. You can also implement a Kafka source for in the DataStream API that deals with upsert semantics, but in the DS API that means you would have to implement what’s basically done already for Table API
o
not sure if I follow. I think it is best to scenify it with an example. We can create a table like this:
Copy code
tableEnv.executeSql(
    """
    CREATE TEMPORARY TABLE payments
    (
        userId STRING,
        country STRING,
        acceptedTime TIMESTAMP(3),
        eventMetadata row(eventId STRING, eventTime TIMESTAMP(3)),
        PRIMARY KEY (userId) NOT ENFORCED
    ) WITH (
      'connector' = 'kafka',....
or alternatively, like this:
Copy code
val paymentTable = tableEnv.fromDataStream(
    payment,
    Schema.newBuilder()
        .column(....
assuming that the payment datastream comes already from the very same kafka topic as in #1. we use env.fromSource() for this. My question is if there is an equivalent method to create a temporary table that emulates the upsert-kafka connector. Thanks!
m
The first example you provided would work for upsert-kafka if you would specify that connector, right?
o
no it does not, we are facing multiple issues. One of them is this: Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'protobuf' that implements 'org.apache.flink.table.factories.DeserializationFormatFactory' in the classpath. Even though we added flink-protobuf in the classpath 🤷
m
I’m not sure if that’s related to upsert-kafka or to the protobuf stuff. Protobuf is quite specific, because it requires compiling the protobuf to Java classes, compile and package the classes and getting that on the classpath