Hi there, I'm currently running an aggregation job...
# troubleshooting
d
Hi there, I'm currently running an aggregation job using the following: • Scala 2.12 • Flink 1.18 • Azure Kubernetes • Flink Kubernetes Operator v1.5.0 I've configured: • taskmanager.memory.process.size = 50GB* • Parallelism and task slots = 7 * I've allocated a much larger amount of memory than what my application actually requires. Also, I didn't notice an issue in the Flink UI where the memory utilization typically doesn't go beyond 60%-70%. The problem I'm facing is that, after processing a certain amount of data, the TaskManager gets terminated, and a new one is spawned in its place. I've thoroughly checked for application errors, but there don't seem to be any. I've also monitored the application using metrics and a profiler, but nothing unusual stands out. Most of the time, both CPU and memory usage are far from reaching their maximum capacities. Any insights or suggestions would be greatly appreciated.
Copy code
2023-09-13 12:24:05,472 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Closing TaskExecutor connection apps-hourly-to-monthly-1-taskmanager-1-2 because: Pod terminated, container termination statuses: [flink-main-container(exitCode=137, reason=OOMKilled, message=null)]
2023-09-13 12:24:05,472 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager [] - Unregistering task executor 68d33563d019f0a7d3bf2ea5624c39bb from the slot manager.
2023-09-13 12:24:05,472 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotStatusSyncer [] - Freeing slot cbf7197affafb5aebbbee3fd0df40299.
2023-09-13 12:24:05,472 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotStatusSyncer [] - Freeing slot 1823c58df4b82e2c39ec62697650e81a.
2023-09-13 12:24:05,472 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotStatusSyncer [] - Freeing slot 0ec007acb8f12f0b7fde7f71b194186b.
2023-09-13 12:24:05,472 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotStatusSyncer [] - Freeing slot 9fa76180f51c85d679abba73afe7211b.
2023-09-13 12:24:05,472 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotStatusSyncer [] - Freeing slot 61cb14160735e184aaf7f2e5e4c66673.
2023-09-13 12:24:05,472 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotStatusSyncer [] - Freeing slot 7681f34846444ce3db5afcd4b0ce1f45.
2023-09-13 12:24:05,476 WARN  akka.remote.transport.netty.NettyTransport                   [] - Remote connection to [null] failed with java.net.ConnectException: Connection refused: /10.244.20.6:6122
2023-09-13 12:24:05,476 WARN  akka.remote.ReliableDeliverySupervisor                       [] - Association with remote system [<akka.tcp://flink@10.244.20.6:6122>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@10.244.20.6:6122>]] Caused by: [java.net.ConnectException: Connection refused: /10.244.20.6:6122]
2023-09-13 12:24:05,538 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager [] - Matching resource requirements against available resources.
2023-09-13 12:24:05,563 INFO  org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Pod apps-hourly-to-monthly-1-taskmanager-1-3 is created.
2023-09-13 12:24:06,182 WARN  akka.remote.transport.netty.NettyTransport                   [] - Remote connection to [null] failed with java.net.ConnectException: Connection refused: /10.244.20.6:6122
2023-09-13 12:24:06,182 WARN  akka.remote.ReliableDeliverySupervisor                       [] - Association with remote system [<akka.tcp://flink@10.244.20.6:6122>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@10.244.20.6:6122>]] Caused by: [java.net.ConnectException: Connection refused: /10.244.20.6:6122]
2023-09-13 12:24:07,479 INFO  org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Received new TaskManager pod: apps-hourly-to-monthly-1-taskmanager-1-3
2023-09-13 12:24:07,479 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Requested worker apps-hourly-to-monthly-1-taskmanager-1-3 with resource spec WorkerResourceSpec {cpuCores=6.0, taskHeapSize=48.438gb (52009369600 bytes), taskOffHeapSize=0 bytes, networkMemSize=64.000mb (67108864 bytes), managedMemSize=0 bytes, numSlots=6}.
2023-09-13 12:24:16,188 WARN  akka.remote.transport.netty.NettyTransport                   [] - Remote connection to [null] failed with java.net.NoRouteToHostException: No route to host
2023-09-13 12:24:16,188 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - TaskManager with id apps-hourly-to-monthly-1-taskmanager-1-2 is no longer reachable.
2023-09-13 12:24:16,188 WARN  akka.remote.ReliableDeliverySupervisor                       [] - Association with remote system [<akka.tcp://flink@10.244.20.6:6122>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@10.244.20.6:6122>]] Caused by: [java.net.NoRouteToHostException: No route to host]
2023-09-13 12:24:16,189 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Disconnect TaskExecutor apps-hourly-to-monthly-1-taskmanager-1-2 because: TaskManager with id apps-hourly-to-monthly-1-taskmanager-1-2 is no longer reachable.
2023-09-13 12:24:16,190 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - FilterLateEvents: AppStats-1m (5/6) (b0101ecb008dc1b9eb9635369098fd46_de6ded48c316165d19830eb2295c855c_4_1) switched from RUNNING to FAILED on apps-hourly-to-monthly-1-taskmanager-1-2 @ 10.244.20.6 (dataPort=36967).
org.apache.flink.runtime.jobmaster.JobMasterException: TaskManager with id apps-hourly-to-monthly-1-taskmanager-1-2 is no longer reachable.
	at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyTargetUnreachable(JobMaster.java:1462) ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT]
	at org.apache.flink.runtime.heartbeat.DefaultHeartbeatMonitor.reportHeartbeatRpcFailure(DefaultHeartbeatMonitor.java:126) ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT]
	at org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.runIfHeartbeatMonitorExists(HeartbeatManagerImpl.java:275) ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT]
	at org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.reportHeartbeatTargetUnreachable(HeartbeatManagerImpl.java:267) ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT]
	at org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.handleHeartbeatRpcFailure(HeartbeatManagerImpl.java:262) ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT]
	at org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl.lambda$handleHeartbeatRpc$0(HeartbeatManagerImpl.java:248) ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT]
	at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) ~[?:?]
	at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) ~[?:?]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453) ~[flink-rpc-akka_735a4268-6705-4d88-8472-d498de57bbe4.jar:1.18-SNAPSHOT]
	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_735a4268-6705-4d88-8472-d498de57bbe4.jar:1.18-SNAPSHOT]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453) ~[flink-rpc-akka_735a4268-6705-4d88-8472-d498de57bbe4.jar:1.18-SNAPSHOT]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218) ~[flink-rpc-akka_735a4268-6705-4d88-8472-d498de57bbe4.jar:1.18-SNAPSHOT]
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84) ~[flink-rpc-akka_735a4268-6705-4d88-8472-d498de57bbe4.jar:1.18-SNAPSHOT]
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168) ~[flink-rpc-akka_735a4268-6705-4d88-8472-d498de57bbe4.jar:1.18-SNAPSHOT]
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [flink-rpc-akka_735a4268-6705-4d88-8472-d498de57bbe4.jar:1.18-SNAPSHOT]
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [flink-rpc-akka_735a4268-6705-4d88-8472-d498de57bbe4.jar:1.18-SNAPSHOT]
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) [flink-rpc-akka_735a4268-6705-4d88-8472-d498de57bbe4.jar:1.18-SNAPSHOT]
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) [flink-rpc-akka_735a4268-6705-4d88-8472-d498de57bbe4.jar:1.18-SNAPSHOT]
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [flink-rpc-akka_735a4268-6705-4d88-8472-d498de57bbe4.jar:1.18-SNAPSHOT]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) [flink-rpc-akka_735a4268-6705-4d88-8472-d498de57bbe4.jar:1.18-SNAPSHOT]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akka_735a4268-6705-4d88-8472-d498de57bbe4.jar:1.18-SNAPSHOT]
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akka_735a4268-6705-4d88-8472-d498de57bbe4.jar:1.18-SNAPSHOT]
	at akka.actor.Actor.aroundReceive(Actor.scala:537) [flink-rpc-akka_735a4268-6705-4d88-8472-d498de57bbe4.jar:1.18-SNAPSHOT]
	at akka.actor.Actor.aroundReceive$(Actor.scala:535) [flink-rpc-akka_735a4268-6705-4d88-8472-d498de57bbe4.jar:1.18-SNAPSHOT]
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [flink-rpc-akka_735a4268-6705-4d88-8472-d498de57bbe4.jar:1.18-SNAPSHOT]
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579) [flink-rpc-akka_735a4268-6705-4d88-8472-d498de57bbe4.jar:1.18-SNAPSHOT]
	at akka.actor.ActorCell.invoke(ActorCell.scala:547) [flink-rpc-akka_735a4268-6705-4d88-8472-d498de57bbe4.jar:1.18-SNAPSHOT]
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [flink-rpc-akka_735a4268-6705-4d88-8472-d498de57bbe4.jar:1.18-SNAPSHOT]
	at akka.dispatch.Mailbox.run(Mailbox.scala:231) [flink-rpc-akka_735a4268-6705-4d88-8472-d498de57bbe4.jar:1.18-SNAPSHOT]
	at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [flink-rpc-akka_735a4268-6705-4d88-8472-d498de57bbe4.jar:1.18-SNAPSHOT]
	at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?]
	at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) [?:?]
	at java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?]
	at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) [?:?]
	at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) [?:?]