Hi team! I was using Flink TableAPI. I wrote a SQL...
# troubleshooting
e
Hi team! I was using Flink TableAPI. I wrote a SQL query and check that it works with printing results. Then after adding sink
upsert-kafka
connector (had error that kafka connector doesn’t work with certain operator so according to someonelse’s problem in this channel I changed connector to
upsert-kafka
). After I added this connector I get this error message:
Copy code
Caused by: org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has already been submitted.                                        │
│     at org.apache.flink.runtime.client.DuplicateJobSubmissionException.ofGloballyTerminated(DuplicateJobSubmissionException.java:35)               │
│     at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:512)                                                               │
│     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)                                                              │
│     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)                                                              │
│     at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)                                                          │
│     at java.base/java.lang.reflect.Method.invoke(Unknown Source)
Any ideas how to fix it? I couldn’t find similar problem in this troubleshoot channel UPD: reason was kafka was unhealthy
TableDescriptor configuration we use: This configuration works perfectly with
kafka
connector as a sink
Copy code
TableDescriptor.for_connector('upsert-kafka') # TODO changes only for upsert-kafka
            .schema(table_schema)
            .option('topic', self.kafka_topic_name)
            .option('properties.bootstrap.servers', self.bootstrap_server)
            .option('key.format', 'avro-confluent')
            .option('key.avro-confluent.schema-registry.url', self.schema_registry_url)
            .option('value.avro-confluent.schema-registry.url', self.schema_registry_url)
            .option('value.format', 'avro-confluent')
            .option('key.fields-prefix', 'KEY_')
            .option('value.fields-include', 'EXCEPT_KEY')
            .build()
        )
Table schema includes primary key:
Copy code
.primary_key('KEY_COUNTRY_CODE', 'KEY_GLOBAL_ENTITY_ID', 'KEY_CITY_ID', 'KEY_window_end')
Previously used this configuration to stream to kafka
Copy code
env.create_temporary_table(
            sink_name,
            TableDescriptor.for_connector('kafka')
            .schema(table_schema)
            .option('topic', self.kafka_topic_name)
            .option('properties.bootstrap.servers', self.bootstrap_server)
            .option('key.format', 'avro-confluent')
            .option('key.fields', ";".join(key_fields))
            .option('key.avro-confluent.schema-registry.url', self.schema_registry_url)
            .option('value.avro-confluent.schema-registry.url', self.schema_registry_url)
            .option('value.format', 'avro-confluent')
            .option('properties.allow.auto.create.topics', 'true')
            .option('properties.auto.create.topics.enable', 'true')
            .build()
        )
Whith print connector execution graph looks like this: