Jaehyeon Kim
02/28/2023, 2:11 AMpyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.output' doesn't support consuming update changes which is produced by node PythonGroupAggregate(groupBy=[k], select=[k, *org.apache.flink.table.functions.python.PythonAggregateFunction$82f60ecbb5d8eed484a7f66dd2ee2921*(k, v) AS TMP_0])
Below show the print sink output.
# input messages
a:a
b:hello
b:world
b:hi
# print
1> +I[a, 1, 1, a]
1> +I[b, 2, 10, hello world]
1> -U[b, 2, 10, hello world]
1> +U[b, 3, 12, hello world hi]
The script and full error messages are attached. Can you please guide me how to fix it?ConradJam
02/28/2023, 6:00 AMSlackbot
02/28/2023, 10:08 AMLucas Alcântara Freire
02/28/2023, 11:50 AM2023-02-28 11:48:55,553 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] {} - Starting shutdown of shard consumer threads and AWS SDK resources of subtask 0 ...
java.lang.InterruptedException: null
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014) ~[?:1.8.0_362]
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088) ~[?:1.8.0_362]
at java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1475) ~[?:1.8.0_362]
at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.awaitTermination(KinesisDataFetcher.java:875) ~[cdp-flink-jobs.jar:?]
at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.runFetcher(KinesisDataFetcher.java:760) ~[cdp-flink-jobs.jar:?]
at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:392) ~[cdp-flink-jobs.jar:?]
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333) ~[flink-dist-1.16.1.jar:1.16.1]
2023-02-28 11:48:55,553 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] {} - Starting shutdown of shard consumer threads and AWS SDK resources of subtask 3 ...
Nithin kharvi
02/28/2023, 12:04 PMAndré Casimiro
02/28/2023, 2:36 PMAmir Hossein Sharifzadeh
02/28/2023, 5:43 PMRoman Bohdan
02/28/2023, 6:16 PMKevin Lam
02/28/2023, 6:29 PM* When HA is enabled theI'm wondering about the unhealthy state because I'm observing an issue where the operator won't take a new savepoint whenupgrade mode may fall back to thesavepoint
behaviour in cases where the job is in an unhealthy state.last-state
savepointTriggerNonce
is updated, or when a FlinkDeployment spec is changed and the upgradeMode
is set to savepoint
. Any help is appreciated!Jasmin Redzepovic
02/28/2023, 6:31 PM1.15.2
and Flink Kubernetes Operator 1.2.0
After deploying a batch job, it completed successfully (job state: FINISHED), but the JobManager is still running. Is there a way to automatically clean up resources after batch job execution?Yaroslav Bezruchenko
02/28/2023, 7:09 PMChloe He
02/28/2023, 9:00 PMSofya T. Irwin
02/28/2023, 9:48 PMfrom the jobmanager:
:29:59.172 [kafka-admin-client-thread | flink.sqlclient-enumerator-admin-client] ERROR org.apache.kafka.common.utils.KafkaThread - Uncaught exception in thread 'kafka-admin-client-thread | flink.sqlclient-enumerator-admin-client':
java.lang.NoClassDefFoundError: Could not initialize class org.apache.kafka.common.message.ApiMessageType
at org.apache.kafka.common.protocol.ApiKeys.requestHeaderVersion(ApiKeys.java:341) ~[?:?]
at org.apache.kafka.clients.ClientRequest.makeHeader(ClientRequest.java:93) ~[?:?]
...
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1196) ~[?:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.message.AlterReplicaLogDirsRequestData$AlterReplicaLogDir
I’m using this api: https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/rest_api/#jars-jarid-run
When I run the same job by logging into the container and executing it manually it runs ok.
I’m trying to understand why running it manually works ok but running it via the REST api does not.Jagan Nalla
02/28/2023, 11:49 PMXiaosheng Wu
03/01/2023, 2:22 AMtimeout
even for the keyspaces look up, which had not happened before. It almost looks like the thread pool is exhausted or the cluster blocked any outbound traffic. The external endpoint should be able to handle at least some traffic, any insights? Thanks!Abhilash S.R
03/01/2023, 2:49 AMorg.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token mismatch: Ignoring message LocalFencedMessage(af00754d9d0dafaccdb1770d2e3f480e, LocalRpcInvocation(requestResourceOverview(Time))) because the fencing token af00754d9d0dafaccdb1770d2e3f480e did not match the expected fencing token 8b714cf5eb5af40851ea06f1df8d4124.
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:88)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
chunilal kukreja
03/01/2023, 6:50 AM2023-03-01 06:44:12,773 WARN org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 6
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available.
We have deployed the flink cluster in our k8s using helm charts & on after every helm delete followed by helm install we get this error. What should be done pls suggest?Ari Huttunen
03/01/2023, 7:53 AMRavi Teja
03/01/2023, 12:05 PM--- "{production_reviews: [{"content": "some_content", "rating": 2},{"content": "some_content", "rating": 4}]}"
select product_reviews from product_table;
Please suggest any way where we can join this array and run aggregate on the rating field.sharad mishra
03/01/2023, 12:34 PMexactly once
deliveree guarantee in my program.
I got following exception in my application, along with duplicate data in the Kafka sink.
I’m curious to know what might have caused this exception along with duplicate data although deliveree guarantee is set to exactlyonce
?
org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka dcn_audit_log_events_unique_stg-6@-1 with FlinkKafkaInternalProducer{transactionalId='unique-6-1222', inTransaction=true, closed=false}
at org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.throwException(KafkaWriter.java:436)
at org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.lambda$onCompletion$0(KafkaWriter.java:417)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.InvalidProducerEpochException: Producer attempted to produce with an old epoch.
Here is the snapshot of FlinkKafkaConsumer properties from taskmanager logs:
acks = -1
batch.size = 16384
bootstrap.servers = [10.7.111.32:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-duplicate-13-402
compression.type = none
<http://connections.max.idle.ms|connections.max.idle.ms> = 540000
<http://delivery.timeout.ms|delivery.timeout.ms> = 120000
enable.idempotence = true
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
<http://linger.ms|linger.ms> = 0
<http://max.block.ms|max.block.ms> = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
<http://metadata.max.age.ms|metadata.max.age.ms> = 300000
<http://metadata.max.idle.ms|metadata.max.idle.ms> = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
<http://metrics.sample.window.ms|metrics.sample.window.ms> = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
<http://reconnect.backoff.max.ms|reconnect.backoff.max.ms> = 1000
<http://reconnect.backoff.ms|reconnect.backoff.ms> = 50
<http://request.timeout.ms|request.timeout.ms> = 30000
retries = 2147483647
<http://retry.backoff.ms|retry.backoff.ms> = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
<http://sasl.login.connect.timeout.ms|sasl.login.connect.timeout.ms> = null
<http://sasl.login.read.timeout.ms|sasl.login.read.timeout.ms> = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
<http://sasl.login.retry.backoff.max.ms|sasl.login.retry.backoff.max.ms> = 10000
<http://sasl.login.retry.backoff.ms|sasl.login.retry.backoff.ms> = 100
sasl.mechanism = GSSAPI
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
<http://sasl.oauthbearer.jwks.endpoint.refresh.ms|sasl.oauthbearer.jwks.endpoint.refresh.ms> = 3600000
<http://sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms|sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms> = 10000
<http://sasl.oauthbearer.jwks.endpoint.retry.backoff.ms|sasl.oauthbearer.jwks.endpoint.retry.backoff.ms> = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
<http://socket.connection.setup.timeout.max.ms|socket.connection.setup.timeout.max.ms> = 30000
<http://socket.connection.setup.timeout.ms|socket.connection.setup.timeout.ms> = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
<http://transaction.timeout.ms|transaction.timeout.ms> = 600000
transactional.id = duplicate-13-402
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
also FlinkKafkaProducer properties snapshot:
org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
acks = -1
batch.size = 16384
bootstrap.servers = [10.7.111.32:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-duplicate-4-402
compression.type = none
<http://connections.max.idle.ms|connections.max.idle.ms> = 540000
<http://delivery.timeout.ms|delivery.timeout.ms> = 120000
enable.idempotence = true
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
<http://linger.ms|linger.ms> = 0
<http://max.block.ms|max.block.ms> = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
<http://metadata.max.age.ms|metadata.max.age.ms> = 300000
<http://metadata.max.idle.ms|metadata.max.idle.ms> = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
<http://metrics.sample.window.ms|metrics.sample.window.ms> = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
<http://reconnect.backoff.max.ms|reconnect.backoff.max.ms> = 1000
<http://reconnect.backoff.ms|reconnect.backoff.ms> = 50
<http://request.timeout.ms|request.timeout.ms> = 30000
retries = 2147483647
<http://retry.backoff.ms|retry.backoff.ms> = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
<http://sasl.login.connect.timeout.ms|sasl.login.connect.timeout.ms> = null
<http://sasl.login.read.timeout.ms|sasl.login.read.timeout.ms> = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
<http://sasl.login.retry.backoff.max.ms|sasl.login.retry.backoff.max.ms> = 10000
<http://sasl.login.retry.backoff.ms|sasl.login.retry.backoff.ms> = 100
sasl.mechanism = GSSAPI
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
<http://sasl.oauthbearer.jwks.endpoint.refresh.ms|sasl.oauthbearer.jwks.endpoint.refresh.ms> = 3600000
<http://sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms|sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms> = 10000
<http://sasl.oauthbearer.jwks.endpoint.retry.backoff.ms|sasl.oauthbearer.jwks.endpoint.retry.backoff.ms> = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
<http://socket.connection.setup.timeout.max.ms|socket.connection.setup.timeout.max.ms> = 30000
<http://socket.connection.setup.timeout.ms|socket.connection.setup.timeout.ms> = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
<http://transaction.timeout.ms|transaction.timeout.ms> = 600000
transactional.id = duplicate-4-402
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
Raefaldhi Amartya Junior
03/01/2023, 2:07 PMCould not find any factories that implement 'org.apache.flink.table.delegation.ExecutorFactory' in the classpath.
Any idea?Siddhesh Kalgaonkar
03/01/2023, 2:55 PMHunter Medney
03/01/2023, 6:03 PMYaroslav Bezruchenko
03/01/2023, 6:03 PMchunilal kukreja
03/01/2023, 7:21 PMHerat Acharya
03/01/2023, 7:35 PMMaster Async Operation -> Based data of master async operation conditionally run one or more Async operations -> Combine these async operations and spit out into single sink
Tony Wang
03/01/2023, 9:27 PMPrithvi Dammalapati
03/01/2023, 9:39 PMSergio Sainz
03/01/2023, 11:40 PMTable res = getTableEnvironment().sqlQuery(query);
StreamTableEnvironment stEnv = (StreamTableEnvironment) getTableEnvironment();
DataStream dataStream = stEnv.toDataStream(res);
dataStream.getTransformation().setSlotSharingGroup(ssgA);
dataStream.getExecutionEnvironment().registerSlotSharingGroup(ssgA);
res = stEnv.fromDataStream(dataStream);
Mohit Aggarwal
03/02/2023, 5:51 AM