Nishant Goenka
06/27/2023, 1:06 PMWARN org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher [] - Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available.
Slackbot
06/27/2023, 5:14 PMIshan
06/27/2023, 5:16 PMjdbc
? or thrift service is the only path?
Also, in general when running Flink SQL client on local machine, is it common practice to have a metastore thrift service running locally as well? How do people get around if the HMS is not accessible by local instance of Flink SQL client.Ilya Sterin
06/27/2023, 5:25 PMjava.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.
at org.apache.flink.runtime.scheduler.DefaultExecutionDeployer.lambda$assignResource$4(DefaultExecutionDeployer.java:227)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source)
at org.apache.flink.runtime.jobmaster.slotpool.PendingRequest.failRequest(PendingRequest.java:88)
at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.cancelPendingRequests(DeclarativeSlotPoolBridge.java:185)
at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.failPendingRequests(DeclarativeSlotPoolBridge.java:408)
at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.notifyNotEnoughResourcesAvailable(DeclarativeSlotPoolBridge.java:396)
at org.apache.flink.runtime.jobmaster.JobMaster.notifyNotEnoughResourcesAvailable(JobMaster.java:887)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$0(AkkaRpcActor.java:301)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:300)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
at akka.actor.ActorCell.invoke(ActorCell.scala:547)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown Source)
... 38 more
Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.
Slackbot
06/27/2023, 5:26 PMLiza M
06/27/2023, 6:04 PMBharathkrishna G M
06/27/2023, 7:04 PMControl the Guh
06/28/2023, 12:47 AMNeha
06/28/2023, 10:18 AMJirawech Siwawut
06/28/2023, 12:23 PMNathanael England
06/28/2023, 4:43 PMNodePort
so I can look at the UI in a web browser?dp api
06/29/2023, 7:50 AMHussain Abbas
06/29/2023, 9:30 AMSlackbot
06/29/2023, 10:44 AMИван Борисов
06/29/2023, 11:02 AMtopic1: {'data': {'temp':25.2, 'sensore_name': 'T1', 'timestamp': 123123131}, 'compare_with': 'T2'}
topic2: {'data': {'temp':28, 'sensore_name': 'T2', 'timestamp': 53543543}, 'compare_with': 'T1'}
topic3: {'data': {'temp':32, 'sensore_name': 'T3', 'timestamp': 6757575}, 'compare_with': 'T2'}
topic4: {'data': {'temp':12, 'sensore_name': 'T3', 'timestamp': 67856222}, 'compare_with': 'T1'}
I need to compare T1.data.temp - T2.data.temp (I need to compare it with EXACTLY last measurement of other sensor (Shown in compare_with), because measurements could come with different frequency: T1 1 message per sec, T2 1 message per 5 sec., T3 3 message per sec.) calculate AVG from this difference in 1 houre window, and if this difference more than AVG, then make Alarm to somewhere... Don't understand how to do it?
Flink 1.17.1 Java 17Iat Chong Chan
06/29/2023, 11:24 AMChase Diem
06/29/2023, 12:28 PMsetMetricsCredentialsProvider
, but the KinesisConfigUtil doesn't utilize it.
Is it possible to set them anywhere or would this require some sort of code change or PR?Hussain Abbas
06/29/2023, 3:03 PMFaisal A. Siddiqui
06/29/2023, 5:25 PMtable1 inner join table2 ... inner join table10
vs
table10 inner join table9 inner join table8... inner join table1
where volume of data increases from table1... table10 ?Lukasz Krawiec
06/29/2023, 7:57 PMforBoundedOutOfOrderness(Duration.ofSeconds(5))
• There is no backpressure, and the kafka consumer lag for both topics is consistently close to 0
• Checkpoints are stored in s3 and are taken every 60s, checkpoint duration is < 1s
• Running Flink 1.14
I'm looking for some pointers on how to debug the problem & in general what could explain this behaviorAntonio Varela
06/30/2023, 6:31 AM@TypeInfo(MyPojoTypeInfoFactory.class)
public class MyPojo {
private List<OtherPojo> listOfOtherPojos;
...
}
can someone please point me to an example on how to accomplish this?
I’m thinking to switch to Avro Serialization because of this, but I’m aware that Kryo must be avoided. Any help will be appreciated. Thank you in advance.Dheeraj Panangat
06/30/2023, 11:18 AMjob_<jobid>_op_KeyedProcessOperator_<operatorid>__<taskIndex>_<totalParallelism>__uuid_<randomuuid>/
Here the folder name changes every time the job restarts. Also the folders seem to be deleted after restarts.
How do I make sure the state is retained and when I restart the job, it is able to query the state.
Appreciate any inputs on this.
ThanksOscar Perez
06/30/2023, 12:46 PMMarquis C
06/30/2023, 1:46 PMSumit Nekar
06/30/2023, 2:05 PMThis is usually caused by: 1) Akka failed sending the message silently, due to problems like oversized payload or serialization failures. In that case, you should find detailed error information in the logs. 2) The recipient needs more time for responding, due to problems like slow machines or network jitters. In that case, you can try to increase akka.ask.timeout.
I tried increasing akka.framesize and akka.ask.timeout values. This resulted in following exception
<http://org.apache.flink.runtime.io|org.apache.flink.runtime.io>.network.partition.PartitionNotFoundException: Partition c188f40cdaf8401fef36bf7e6122cd32#3@d7e4ba19d49c4286666eb145f9cc4295_6c80f82e38c6db0aca2cd65eb9ddef48_3_0 not found.
How to recover the job in such cases? One way was to cancel the job and redeploy. This worked but for a stateful job this might not be helpful. Please suggest of there are any better options.
Flink version : 1.17.1
Using flink k8s operator.Chris Tabakakis
06/30/2023, 3:42 PMCaroline McKee
06/30/2023, 6:09 PMclass CalculateValueDiff(TableFunction):
def eval(self, ids, grouped_values, is_t1s):
t1_players = [id for id, is_t1 in zip(ids, is_t1s) if is_t1]
t2_players = [id for id, is_t1 in zip(ids, is_t1s) if not is_t1]
for t1_player in t1_players:
for t2_player in t2_players:
value_diff = abs(grouped_values[ids.index(t1_player)] - grouped_values[ids.index(t2_player)])
yield Row(value_diff, t1_player, t2_player)
Once I register this UDTF as calculate_value_diff
(along with custom collect_list_<T>
aggregators for different data types- those are working fine), I run this SQL query:
res_table = t_env.sql_query("""
SELECT
message_time,
calculate_value_diff(
collect_list_int(id),
collect_list_float(value),
collect_list_bool(is_t1)
) as value_diffs
FROM InputTable
GROUP BY message_time
""")
res_stream = t_env.to_changelog_stream(res_table)
The issue I'm having is the that output of the UDTF is technically a generator, not a row. So when I call <http://t_env.to|t_env.to>_changelog_stream(res_table)
, I get the error:
File "pyflink/fn_execution/beam/beam_coder_impl_fast.pyx", line 101, in pyflink.fn_execution.beam.beam_coder_impl_fast.FlinkLengthPrefixCoderBeamWrapper.encode_to_stream
File "pyflink/fn_execution/coder_impl_fast.pyx", line 296, in pyflink.fn_execution.coder_impl_fast.ValueCoderImpl.encode_to_stream
File "pyflink/fn_execution/coder_impl_fast.pyx", line 341, in pyflink.fn_execution.coder_impl_fast.FlattenRowCoderImpl.encode_to_stream
File "pyflink/fn_execution/coder_impl_fast.pyx", line 391, in pyflink.fn_execution.coder_impl_fast.RowCoderImpl.encode_to_stream
AttributeError: 'generator' object has no attribute 'get_fields_by_names'
It seems that stream converter expects a row (works fine if I replace the yield
with return
in the UDTF. But then I'm obviously only getting the first output value). Not sure how to proceed from here... What really needs to happen is the pyflink.fn_execution.coder_impl_fast.RowCoderImpl.encode_to_stream
function needs to be adjusted to iterate through the generator if the value is a generator (related: it seems like this exact same issue was fixed here in `commit 62f3c9c`: https://github.com/apache/flink/commit/62f3c9c9dbacf27e297dd2a64788a4d26efccf27 and appears in release-1.13
, but then the fix was lost in subsequent versions). Is there an easy way around this or would it be possible to get a fix pushed?kiran kumar
07/02/2023, 7:04 AMINFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for SlidingEventTimeWindows -> Process (2/2)#38618 (59bbe8ba8d5d7045ee7d2aacec66e7c8_98a01397958c5196eb88cd4117e1b3ad_1_38618).
INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FAILED to JobManager for task SlidingEventTimeWindows -> Process (2/2)#38618 59bbe8ba8d5d7045ee7d2aacec66e7c8_98a01397958c5196eb88cd4117e1b3ad_1_38618.
2023-07-02 06:31:07,254 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 9780cdcf42351473e6f636930c2c4d94.
2023-07-02 06:31:07,256 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task Source: Kafka CFTransactions -> Map -> Filter -> Timestamps/Watermarks -> Map -> Process (2/2)#38619 (59bbe8ba8d5d7045ee7d2aacec66e7c8_b5aa0fa462c5d45eec0c56cc48b37e5e_1_38619), deploy into slot with allocation id 9780cdcf42351473e6f636930c2c4d94.
2023-07-02 06:31:07,257 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 9780cdcf42351473e6f636930c2c4d94.
2023-07-02 06:31:07,257 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Kafka CFTransactions -> Map -> Filter -> Timestamps/Watermarks -> Map -> Process (2/2)#38619 (59bbe8ba8d5d7045ee7d2aacec66e7c8_b5aa0fa462c5d45eec0c56cc48b37e5e_1_38619) switched from CREATED to DEPLOYING.
2023-07-02 06:31:07,257 INFO org.apache.flink.runtime.taskmanager.Task [] - Loading JAR files for task Source: Kafka CFTransactions -> Map -> Filter -> Timestamps/Watermarks -> Map -> Process (2/2)#38619 (59bbe8ba8d5d7045ee7d2aacec66e7c8_b5aa0fa462c5d45eec0c56cc48b37e5e_1_38619) [DEPLOYING].
2023-07-02 063107,257 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: Kafka CFTransactions -> Map -> Filter -> Timestamps/Watermarks -> Map -> Process (2/2)#38619 (59bbe8ba8d5d7045ee7d2aacec66e7c8_b5aa0fa462c5d45eec0c56cc48b37e5e_1_38619) switched from DEPLOYING to FAILED with failure cause: java.lang.IllegalStateException: The library registration references a different set of library BLOBs than previous registrations for this job:
old:[p-954843a2fbeba5331142de719765e1b5e15d2a52-d769e111f8329e90b17813ba49062540]
new:[p-954843a2fbeba5331142de719765e1b5e15d2a52-d8de14d66657f4955f18011970565412]
at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.verifyClassLoader(BlobLibraryCacheManager.java:419)
at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.access$500(BlobLibraryCacheManager.java:359)
at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.getOrResolveClassLoader(BlobLibraryCacheManager.java:235)
at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.access$1100(BlobLibraryCacheManager.java:202)
at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$DefaultClassLoaderLease.getOrResolveClassLoader(BlobLibraryCacheManager.java:336)
at org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:1024)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:612)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.base/java.lang.Thread.run(Unknown Source)
dp api
07/02/2023, 10:49 AMenv_settings = <http://EnvironmentSettings.in|EnvironmentSettings.in>_streaming_mode()
t_env = TableEnvironment.create(env_settings)
t_env.get_config().set("pipeline.jars", "directory_path_to_jar_file")
t_env.get_config().set("table.exec.source.idle-timeout", "1000")
When I am writing a SQL SELECT query and Ordering by timestamp (which is converted using TO_TIMESTAMP_LTZ) I am getting an error
"Sort on a non-time-attribute field is not supported"
I searched online and found this - FLINK-18401
It suggested that I enable table.exec.non-temporal-sort.enabled=true which is false by default.
However I cannot find this in the docs and do not know how to configure this.
Any help would be greatly appreciated. 🙏Keyur Makwana
07/02/2023, 1:38 PM