Elizaveta Batanina
09/21/2023, 4:18 PMupsert-kafka
connector (had error that kafka connector doesn’t work with certain operator so according to someonelse’s problem in this channel I changed connector to upsert-kafka
).
After I added this connector I get this error message:
Caused by: org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has already been submitted. │
│ at org.apache.flink.runtime.client.DuplicateJobSubmissionException.ofGloballyTerminated(DuplicateJobSubmissionException.java:35) │
│ at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:512) │
│ at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) │
│ at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) │
│ at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) │
│ at java.base/java.lang.reflect.Method.invoke(Unknown Source)
Any ideas how to fix it? I couldn’t find similar problem in this troubleshoot channel
UPD: reason was kafka was unhealthyElizaveta Batanina
09/21/2023, 4:20 PMkafka
connector as a sink
TableDescriptor.for_connector('upsert-kafka') # TODO changes only for upsert-kafka
.schema(table_schema)
.option('topic', self.kafka_topic_name)
.option('properties.bootstrap.servers', self.bootstrap_server)
.option('key.format', 'avro-confluent')
.option('key.avro-confluent.schema-registry.url', self.schema_registry_url)
.option('value.avro-confluent.schema-registry.url', self.schema_registry_url)
.option('value.format', 'avro-confluent')
.option('key.fields-prefix', 'KEY_')
.option('value.fields-include', 'EXCEPT_KEY')
.build()
)
Table schema includes primary key:
.primary_key('KEY_COUNTRY_CODE', 'KEY_GLOBAL_ENTITY_ID', 'KEY_CITY_ID', 'KEY_window_end')
Elizaveta Batanina
09/21/2023, 4:26 PMenv.create_temporary_table(
sink_name,
TableDescriptor.for_connector('kafka')
.schema(table_schema)
.option('topic', self.kafka_topic_name)
.option('properties.bootstrap.servers', self.bootstrap_server)
.option('key.format', 'avro-confluent')
.option('key.fields', ";".join(key_fields))
.option('key.avro-confluent.schema-registry.url', self.schema_registry_url)
.option('value.avro-confluent.schema-registry.url', self.schema_registry_url)
.option('value.format', 'avro-confluent')
.option('properties.allow.auto.create.topics', 'true')
.option('properties.auto.create.topics.enable', 'true')
.build()
)
Elizaveta Batanina
09/21/2023, 4:30 PM