Shahid Chohan
07/12/2023, 12:56 AMEugenio Gastelum
07/12/2023, 1:20 AM.wait()
function on a Table) But I get several Java errors. and this one bellow seems to be the main problem, it looks that it happens frequently to other people since I've seen it across stackoverflow:
Caused by: java.lang.RuntimeException: Failed to create stage bundle factory! INFO:root:Initializing Python harness: C:\Users\eugen\anaconda3\lib\site-packages\pyflink\fn_execution\beam\beam_boot.py --id=6-1 --provision_endpoint=localhost:53973
INFO:root:Starting up Python harness in loopback mode.
I've seen a solution was to set one variable _python_worker_execution_mode
to `process`: https://stackoverflow.com/a/72037218/9588300
However, I've set the variable and it gives the same error. This is how I've set it up in my .py file:
from pyflink.datastream import StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///" + CURRENT_DIR + "/lib/flink-sql-connector-kinesis-1.15.2.jar")
table_env = StreamTableEnvironment.create(stream_execution_environment=env)
table_env.get_config().set("pipeline.jars","file:///" + CURRENT_DIR + "/lib/flink-sql-connector-kinesis-1.15.2.jar")
table_env.get_config().set("table.exec.sink.not-null-enforcer", "DROP")
table_env.get_config().set("_python_worker_execution_mode", "process")
Did I set it wrong?Mukesh Kumar
07/12/2023, 4:57 AMPappu Yadav
07/12/2023, 5:03 AMMarco Villalobos
07/12/2023, 6:08 AMTumblingEventTimeWindows.of(Time.milliseconds(1))
Sameer Chandra
07/12/2023, 8:08 AMShubham Saxena
07/12/2023, 9:30 AMThread.currentThread().getContextClassLoader()
class loader in your implementation_."
In this case, our dependencies like jersey client and hadoop are using thread's class loader to initialise some classes which we have no control over. Has anyone faced this issue ?? Isn't this limitation too restrictive because it would be very hard to reimplement your dependencies ??kingsathurthi
07/12/2023, 9:55 AMNo dirty JobResults can be restored
2023-07-12 09:29:47,171 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Starting the resource manager.
2023-07-12 09:29:47,171 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [] - Stopping SessionDispatcherLeaderProcess.
2023-07-12 09:29:47,173 INFO org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Stopping DefaultJobGraphStore.
2023-07-12 09:29:47,174 *ERROR* org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - *Fatal* error occurred in the cluster entrypoint.
_java.util.concurrent.CompletionException_: _java.lang.IllegalStateException_: The base directory of the JobResultStore isn't accessible. No dirty JobResults can be restored.
at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) [?:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: _java.lang.IllegalStateException_: The base directory of the JobResultStore isn't accessible. No dirty JobResults can be restored.
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:182) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:194) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.16.2.jar:1.16.2]
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188) ~[flink-dist-1.16.2.jar:1.16.2]
... 4 more
2023-07-12 09:29:47,262 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting KubernetesApplicationClusterEntrypoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
2023-07-12 09:29:47,269 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting down rest endpoint.
2023-07-12 09:29:47,270 INFO org.apache.flink.runtime.blob.BlobServer [] - Stopped BLOB server at 0.0.0.0:6124
2023-07-12 09:29:48,074 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Shutting down remote daemon.
2023-07-12 09:29:48,075 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Shutting down remote daemon.
2023-07-12 09:29:48,076 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Remote daemon shut down; proceeding with flushing remote transports.
2023-07-12 09:29:48,076 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Remote daemon shut down; proceeding with flushing remote transports.
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.jboss.netty.util.internal.ByteBufferUtil (file:/tmp/flink-rpc-akka_4b66f1f1-01f0-4003-843c-386e04719e19.jar) to method java.nio.DirectByteBuffer.cleaner()
WARNING: Please consider reporting this to the maintainers of org.jboss.netty.util.internal.ByteBufferUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
2023-07-12 09:29:48,164 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Remoting shut down.
2023-07-12 09:29:48,164 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Remoting shut down.
Keyur Makwana
07/12/2023, 10:19 AMTiphanie Dousset
07/12/2023, 1:10 PMCREATE TABLE `MyTable` (
`object_ID` INTEGER PRIMARY KEY NOT ENFORCED,
`Path` VARCHAR,
`Status` VARCHAR NOT NULL
) WITH (
'connector' = 'upsert-kafka',
'value.format' = 'json',
'topic' = 'my-topic',
'key.format' = 'json',
'properties.security.protocol' = 'SASL_SSL',
'properties.sasl.mechanism' = 'PLAIN',
'properties.bootstrap.servers' = 'xxxx',
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="xxxx" password="yyyy";'
);
My pod complains that it can't find the "PlainLoginModule".
I found this documentation: https://issues.apache.org/jira/browse/FLINK-31361
But using
<http://org.apache.flink.kafka.shaded.org|org.apache.flink.kafka.shaded.org>.apache.kafka.common.security.plain.PlainLoginModule
instead of org.apache.kafka.common.security.plain.PlainLoginModule
doesn't solve the issue.
What is the correct name?
Am I missing something else?
(I've tried the workaround suggested in the link above; but that leads to other errors then 🙃)Neha
07/12/2023, 1:52 PMwatermarkStrategy = WatermarkStrategy.forMonotonousTimestamps()
.withIdleness(Duration.ofSeconds(60))
.withTimestampAssigner((event, ts) -> System.currentTimeMillis());
I am getting the following Exception:
Caused by: java.lang.NoSuchMethodException: <className>.$deserializeLambda$(java.lang.invoke.SerializedLambda)
at java.lang.Class.getDeclaredMethod(Class.java:2158)
at java.lang.invoke.SerializedLambda$1.run(SerializedLambda.java:224)
at java.lang.invoke.SerializedLambda$1.run(SerializedLambda.java:221)
at java.security.AccessController.doPrivileged(Native Method)
at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:221)
at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1274)
... 30 more
can somebody please point me to the correct way to resolve this?Tudor Pavel
07/12/2023, 3:13 PMCaused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.table.runtime.operators.python.table.PythonTableFunctionOperator
ClassLoader info: URL ClassLoader:
file: '/tmp/tm_172.25.0.5:44065-a2ef4a/blobStorage/job_c5da7b3563558ec66d5e773659c8abe1/blob_p-18b059e5f10b72a375b507c7f72c8ab9931306f9-ae41178318456ae52391027abd82d3de' (valid JAR)
Class not resolvable through given classloader.
Many debugging hours later I've narrowed it down to a simple reproducible example:
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
# It fails if I uncomment this
# kafka_connector_path = os.path.abspath('jars/flink-sql-connector-kafka-1.17.1.jar')
# t_env.get_config().set("pipeline.jars", f"file://{kafka_connector_path}")
# define the source
table = t_env.from_elements(
elements=[
(1, '{"name": "Flink", "tel": 123, "addr": {"country": "Germany", "city": "Berlin"}}'),
(2, '{"name": "hello", "tel": 135, "addr": {"country": "China", "city": "Shanghai"}}'),
(3, '{"name": "world", "tel": 124, "addr": {"country": "USA", "city": "NewYork"}}'),
(4, '{"name": "PyFlink", "tel": 32, "addr": {"country": "China", "city": "Hangzhou"}}')
],
schema=['id', 'data'])
# execute sql statement
@udtf(result_types=[DataTypes.STRING(), <http://DataTypes.INT|DataTypes.INT>(), DataTypes.STRING()])
def parse_data(data: str):
json_data = json.loads(data)
yield json_data['name'], json_data['tel'], json_data['addr']['country']
t_env.create_temporary_function('parse_data', parse_data)
t_env.execute_sql(
"""
SELECT *
FROM %s, LATERAL TABLE(parse_data(`data`)) t(name, tel, country)
""" % table
).print()
If I uncomment the part with pipeline.jars
it fails when I run it like:
flink run -py basic.py
However I discovered it works if I include the connector from the CLI instead of the code:
flink run -py basic.py --jarfile jars/flink-sql-connector-kafka-1.17.1.jar
After further digging I found the difference is there's 2 JARs on the taskmanager when it works:
oot@58c941f39b15:/opt/flink# ls -lah /tmp/tm_172.25.0.5\:33231-ecc45a/blobStorage/job_2de88f8af859fccdc956e62cc32c4a88/
total 37M
drwxr-xr-x 2 flink flink 4.0K Jul 12 14:24 .
drwxr-xr-x 40 flink flink 4.0K Jul 12 14:24 ..
-rw-r--r-- 1 flink flink 5.4M Jul 12 14:24 blob_p-18b059e5f10b72a375b507c7f72c8ab9931306f9-d01b99c7749267191b1d6da2dfe3a8bc
-rw-r--r-- 1 flink flink 32M Jul 12 14:24 blob_p-275820cb9b5e36c9f3e1e5483e93d0b808fe257e-5f41622e51ea7098f251bcfc3285b1bb
And just 1 JAR when it doesn't:
root@58c941f39b15:/opt/flink# ls -lah /tmp/tm_172.25.0.5\:33231-ecc45a/blobStorage/job_af184fc448ee510ac4ebbe92c7e7d893/
total 5.4M
drwxr-xr-x 2 flink flink 4.0K Jul 12 14:52 .
drwxr-xr-x 22 flink flink 4.0K Jul 12 14:52 ..
-rw-r--r-- 1 flink flink 5.4M Jul 12 14:52 blob_p-18b059e5f10b72a375b507c7f72c8ab9931306f9-1fc054bb0990f0378d86166b1edd63ea
I figured out the 5.4M JAR is the Kafka connector from my project, while the 32M one is the flink-python-1.17.1.jar
that I think should get auto-uploaded.
So as a workaround I can copy the flink-python jar into my project subfolder and specify both dependencies in the code and it works:
kafka_connector_path = os.path.abspath('jars/flink-sql-connector-kafka-1.17.1.jar')
flink_python_path = os.path.abspath('jars/flink-python-1.17.1.jar')
t_env.get_config().set("pipeline.jars", f"file://{kafka_connector_path};file://{flink_python_path}")
But since I haven't seen this documented anywhere I wonder if it might be a bug. It's like specifying pipeline.jars
in the code will override the auto-upload of the flink-python JAR to each task manager when doing flink run --python
.
Any clarity on this behavior would be much appreciated, thanks! 🙇Tsering
07/12/2023, 3:23 PMNo LoginModule found for org.apache.kafka.common.security.scram.ScramLoginModule
can some one please help me on this.Kireet Agrawal
07/12/2023, 5:40 PM20 tasks will be restarted to recover the failed task 10fa6513f35966426dfd36576bff2bae_2878b9ab6776d58d3ac8f74bba6fe037_0_3879.
at java.lang.Thread.run(Unknown Source) ~[?:?]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:612) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:1024) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$DefaultClassLoaderLease.getOrResolveClassLoader(BlobLibraryCacheManager.java:336) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.access$1100(BlobLibraryCacheManager.java:202) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.getOrResolveClassLoader(BlobLibraryCacheManager.java:235) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.access$500(BlobLibraryCacheManager.java:359) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.verifyClassLoader(BlobLibraryCacheManager.java:419) ~[flink-dist-1.16.1.jar:1.16.1]
new:[p-143a36af2df48da99d11f33fe47fff0418ca5511-ba29eb7b159adeb0e4cba1e5b379399f]
old:[p-143a36af2df48da99d11f33fe47fff0418ca5511-f37b157bf869ea7e830900ee42ceb4dd]
java.lang.IllegalStateException: The library registration references a different set of library BLOBs than previous registrations for this job:
2023-07-12 15:05:22,500 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Kinesis Data Stream -> watermarking_strategy_v0 -> Flat Map (1/16) (10fa6513f35966426dfd36576bff2bae_2878b9ab6776d58d3ac8f74bba6fe037_0_3879) switched from DEPLOYING to FAILED on eventlogger-pipeline-v0-taskmanager-18-1 @ ip-10-1-10-12.us-east-2.compute.internal (dataPort=42471).
Seeing the hash old:[p-143a36af2df48da99d11f33fe47fff0418ca5511-f37b157bf869ea7e830900ee42ceb4dd]
that is being referenced related to KinesisDataFetcher, but not sure where the new one would be coming from. Any ideas?Jalil Alchy
07/12/2023, 6:02 PMKevin Lam
07/12/2023, 6:34 PMSlackbot
07/13/2023, 4:33 AMRajat Ahuja
07/13/2023, 9:05 AMThe program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failure executing: GET at: <https://10.43.0.1/api/v1/namespaces/apl-edm-streaming-svcs-l3-dev1/services/session-seven-deployment-only-example-rest>. Message: Forbidden!Configured service account doesn't have access. Service account may have been revoked. services "session-seven-deployment-only-example-rest" is forbidden: User "system:serviceaccount:apl-edm-streaming-svcs-l3-dev1:flink" cannot get resource "services" in API group "" in the namespace "apl-edm-streaming-svcs-l3-dev1".
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095)
at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
Caused by: org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET at: <https://10.43.0.1/api/v1/namespaces/apl-edm-streaming-svcs-l3-dev1/services/session-seven-deployment-only-example-rest>. Message: Forbidden!Configured service account doesn't have access. Service account may have been revoked. services "session-seven-deployment-only-example-rest" is forbidden: User "system:serviceaccount:apl-edm-streaming-svcs-l3-dev1:flink" cannot get resource "services" in API group "" in the namespace "apl-edm-streaming-svcs-l3-dev1".
Permissions kubectl get serviceaccounts
NAME SECRETS AGE
flink 1 154d
flink-operator 1 154d
kubectl get roles
NAME CREATED AT
flink 2023-02-08T185229Z
flink-operator 2023-07-11T205124Z
MU-C02CF4WYMD6R:seven rxahuja$ kubectl describe role flink
Name: flink
Labels: app.kubernetes.io/managed-by=Helm
app.kubernetes.io/name=flink-kubernetes-operator
app.kubernetes.io/version=1.5.0
helm.sh/chart=flink-kubernetes-operator-1.5.0
Annotations: helm.sh/resource-policy: keep
meta.helm.sh/release-name: flink-kubernetes-operator
meta.helm.sh/release-namespace: apl-edm-streaming-svcs-l3-dev1
PolicyRule:
Resources Non-Resource URLs Resource Names Verbs
--------- ----------------- -------------- -----
configmaps [] [] [*]
pods [] [] [*]
deployments.apps/finalizers [] [] [*]
deployments.apps [] [] [*]
MU-C02CF4WYMD6R:seven rxahuja$ kubectl describe role flink-operator
Name: flink-operator
Labels: app.kubernetes.io/managed-by=Helm
app.kubernetes.io/name=flink-kubernetes-operator
app.kubernetes.io/version=1.6-SNAPSHOT
helm.sh/chart=flink-kubernetes-operator-1.6-SNAPSHOT
Annotations: <none>
PolicyRule:
Resources Non-Resource URLs Resource Names Verbs
--------- ----------------- -------------- -----
configmaps [] [] [*]
deployments [] [] [*]
events [] [] [*]
flinkdeployments.flink.apache.org [] [] [*]
pods [] [] [*]
replicasets [] [] [*]
services [] [] [*]
deployments.apps/finalizers [] [] [*]
deployments.apps [] [] [*]Mohammad Saif Malek
07/13/2023, 10:37 AMJashwanth S J
07/13/2023, 11:02 AMAlex Bryant
07/13/2023, 12:20 PM2023-07-13 22:07:27 2023-07-13 12:07:27,393 INFO org.apache.flink.runtime.io.network.netty.NettyClient [] - Transport type 'auto': using EPOLL.
2023-07-13 22:07:27 2023-07-13 12:07:27,394 INFO org.apache.flink.runtime.io.network.netty.NettyClient [] - Successful initialization (took 34 ms).
2023-07-13 22:07:27 2023-07-13 12:07:27,397 INFO org.apache.flink.runtime.io.network.netty.NettyServer [] - Transport type 'auto': using EPOLL.
2023-07-13 22:07:27 2023-07-13 12:07:27,419 INFO org.apache.flink.runtime.io.network.netty.NettyServer [] - Successful initialization (took 23 ms). Listening on SocketAddress /0.0.0.0:42927.
2023-07-13 22:07:27 2023-07-13 12:07:27,420 INFO org.apache.flink.runtime.taskexecutor.KvStateService [] - Starting the kvState service and its components.
2023-07-13 22:07:27 2023-07-13 12:07:27,451 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting RPC endpoint for org.apache.flink.runtime.taskexecutor.TaskExecutor at <akka://flink/user/rpc/taskmanager_0> .
2023-07-13 22:07:27 2023-07-13 12:07:27,461 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Start job leader service.
2023-07-13 22:07:27 2023-07-13 12:07:27,464 INFO org.apache.flink.runtime.filecache.FileCache [] - User file cache uses directory /tmp/flink-dist-cache-6ae1b4c5-61be-4d22-a17e-7044a879417f
2023-07-13 22:07:27 2023-07-13 12:07:27,465 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Connecting to ResourceManager <akka.tcp://flink@localhost:6123/user/rpc/resourcemanager_*(00000000000000000000000000000000)>.
2023-07-13 22:07:27 2023-07-13 12:07:27,510 WARN akka.remote.ReliableDeliverySupervisor [] - Association with remote system [<akka.tcp://flink@localhost:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@localhost:6123>]] Caused by: [java.net.ConnectException: Connection refused: localhost/127.0.0.1:6123]
2023-07-13 22:07:27 2023-07-13 12:07:27,511 WARN akka.remote.transport.netty.NettyTransport [] - Remote connection to [null] failed with java.net.ConnectException: Connection refused: localhost/127.0.0.1:6123
2023-07-13 22:07:27 2023-07-13 12:07:27,516 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address <akka.tcp://flink@localhost:6123/user/rpc/resourcemanager_*>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@localhost:6123/user/rpc/resourcemanager_*>.
2023-07-13 22:07:37 2023-07-13 12:07:37,541 WARN akka.remote.transport.netty.NettyTransport [] - Remote connection to [null] failed with java.net.ConnectException: Connection refused: localhost/127.0.0.1:6123
2023-07-13 22:07:37 2023-07-13 12:07:37,542 WARN akka.remote.ReliableDeliverySupervisor [] - Association with remote system [<akka.tcp://flink@localhost:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@localhost:6123>]] Caused by: [java.net.ConnectException: Connection refused: localhost/127.0.0.1:6123]
2023-07-13 22:07:37 2023-07-13 12:07:37,543 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address <akka.tcp://flink@localhost:6123/user/rpc/resourcemanager_*>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@localhost:6123/user/rpc/resourcemanager_*>.
The jobmanger seems to be start fine and doing what we expect, listening to a Kafka Topic:
...
2023-07-13 22:17:40 2023-07-13 12:17:40,409 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@6d61af7a for Kafka Streaming Job for topic: dummy_topic (cddea62acc79eb3bfe533bb4e14642f2).
2023-07-13 22:17:40 2023-07-13 12:17:40,416 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Starting execution of job 'Kafka Streaming Job for topic: dummy_topic' (cddea62acc79eb3bfe533bb4e14642f2) under job master id 00000000000000000000000000000000.
2023-07-13 22:17:40 2023-07-13 12:17:40,418 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Starting scheduling with scheduling strategy [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
2023-07-13 22:17:40 2023-07-13 12:17:40,418 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job Kafka Streaming Job for topic: dummy_topic (cddea62acc79eb3bfe533bb4e14642f2) switched from state CREATED to RUNNING.
2023-07-13 22:17:40 2023-07-13 12:17:40,420 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Kafka Source -> Sink: Print to Std. Out (1/1) (8a7317ac5692db1fd2fcedd39324f25a_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from CREATED to SCHEDULED.
2023-07-13 22:17:40 2023-07-13 12:17:40,428 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Connecting to ResourceManager <akka.tcp://flink@localhost:6123/user/rpc/resourcemanager_*(00000000000000000000000000000000)>
2023-07-13 22:17:40 2023-07-13 12:17:40,431 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Resolved ResourceManager address, beginning registration
2023-07-13 22:17:40 2023-07-13 12:17:40,432 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registering job manager 00000000000000000000000000000000@akka.tcp://flink@localhost:6123/user/rpc/jobmanager_2 for job cddea62acc79eb3bfe533bb4e14642f2.
2023-07-13 22:17:40 2023-07-13 12:17:40,434 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registered job manager 00000000000000000000000000000000@akka.tcp://flink@localhost:6123/user/rpc/jobmanager_2 for job cddea62acc79eb3bfe533bb4e14642f2.
2023-07-13 22:17:40 2023-07-13 12:17:40,438 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - JobManager successfully registered at ResourceManager, leader id: 00000000000000000000000000000000.
2023-07-13 22:17:40 2023-07-13 12:17:40,439 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job cddea62acc79eb3bfe533bb4e14642f2: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=1}]
2023-07-13 22:17:40 2023-07-13 12:17:40,474 INFO org.apache.flink.StreamingJob [] - Flink streaming job for Kafka topic 'dummy_topic' started
Kakfa is running fine as I can see messages coming into that topic fine. Our docker-compose for Flink is as follows:
version: "3.3"
services:
jobmanager:
restart: always
image: <http://deeprecognition.azurecr.io/runtime-flink_jobmanager1:${BUILD_NUMBER}|deeprecognition.azurecr.io/runtime-flink_jobmanager1:${BUILD_NUMBER}>
container_name: runtime-flink_jobmanager1
expose:
- "${JOB_MANAGER_PORT}"
ports:
- "${JOB_MANAGER_PORT}:${JOB_MANAGER_PORT}"
command: jobmanager
dns:
- ${ENVIRONMENT_DNS}
environment:
- FQDN_SUFFIX=${FQDN_SUFFIX}
- JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS}
- JOB_MANAGER_RPC_PORT=${JOB_MANAGER_RPC_PORT}
- KAFKA_HOST=${KAFKA_HOST}
- KAFKA_PORT=${KAFKA_PORT}
- KAFKA_TOPIC=${KAFKA_TOPIC}
- VM_AGENT_NAME=${VM_AGENT_NAME}
taskmanager:
restart: always
image: <http://deeprecognition.azurecr.io/runtime-flink_taskmanager1:${BUILD_NUMBER}|deeprecognition.azurecr.io/runtime-flink_taskmanager1:${BUILD_NUMBER}>
container_name: runtime-flink_taskmanager1
depends_on:
- jobmanager
expose:
- "${TASK_MANAGER_PORT}"
ports:
- "${TASK_MANAGER_PORT}:${TASK_MANAGER_PORT}"
command: taskmanager
dns:
- ${ENVIRONMENT_DNS}
environment:
- JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS}
- JOB_MANAGER_RPC_PORT=${JOB_MANAGER_RPC_PORT}
- RESTART_STRATEGY_FAILURE_RATE_DELAY_BETWEEN_ATTEMPTS=30000
- RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL=60000
- RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL=3
- RESTART_STRATEGY=failureRate
- TASK_MANAGER_MEMORY_PROCESS_SIZE=4096m # TODO: adjust this value according to requirements per environment
- TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
Chris Tabakakis
07/13/2023, 2:45 PM(7c47ca8b3c53e4646c5f02c6e329a650_6b68f1bbaa75cb9b23c5012ad11acdd0_1_0) switched from RUNNING to FAILED with failure cause: java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at run(TriangleEnumerator.java:72)) -> Combine (Reduce at run(TriangleEnumerator.java:74))' , caused an error: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Connection unexpectedly closed by remote task manager '10.0.1.72/10.0.1.72:45235 [ 10.0.1.72:35991-49e4c4 ] '. This might indicate that the remote task manager was lost.
I've been struggling to figure out what the cause of this might be. At first I thought it might be a problem with hadoop, but I tried a simpler program that reads the same data but just writes it to another file in hdfs, and it worked fine. I have the log from the task manager that didn't complete the first task, and it's set to loglevel=debug, because I couldn't find anything useful in info. I'm appending it here. I still think this might be related to hadoop because i'm noticing some weird things such as:
• At the beginning, it states hadoop version = 2.8.3, which is not the case, my system is running hadoop 3.3.5, and I don't think I've set the version as an option somewhere. I've also placed the respective jar files in the lib folder, with hadoop version 3.3.5.
• At various points, a WARN level message states Could not find Hadoop configuration via any of the supported methods (Flink configuration, environment variables). However, by looking at the source code, i can see that it should only display this message after displaying various debug messages, looking for the hadoop configuration in my system. This indicates that it can't even find the environmental variables that I've set, which is concerning.
I appreciate any help I can get, and will gladly provide additional info if prompted.Ravi Nishant
07/13/2023, 10:24 PM/opt/lib
to /opt/flink/lib
. In the flinkdeployment
custom resource then I refer job like below -
job:
jarURI: local:///opt/flink/lib/myjob-all.jar
parallelism: 4
upgradeMode: 'savepoint'
state: 'running'
In this case, I see the following exception and the job manager pod goes to failed state -
flink-main-container Exception in thread "main" java.lang.VerifyError: Instruction type does not match stack map
flink-main-container Exception Details:
flink-main-container Location:
flink-main-container org/apache/flink/util/FlinkUserCodeClassLoaders.wrapWithSafetyNet(Lorg/apache/flink/util/FlinkUserCodeClassLoader;Z)Lorg/apache/flink/util/MutableURLClassLoader; @20: areturn
flink-main-container Reason:
flink-main-container Type 'org/apache/flink/util/FlinkUserCodeClassLoader' (current frame, stack[0]) is not assignable to 'org/apache/flink/util/MutableURLClassLoader' (stack map, stack[0])
flink-main-container Current Frame:
flink-main-container bci: @20
flink-main-container flags: { }
flink-main-container locals: { 'org/apache/flink/util/FlinkUserCodeClassLoader', integer }
flink-main-container stack: { 'org/apache/flink/util/FlinkUserCodeClassLoader' }
flink-main-container Stackmap Frame:
flink-main-container bci: @20
flink-main-container flags: { }
flink-main-container locals: { 'org/apache/flink/util/FlinkUserCodeClassLoader', integer }
flink-main-container stack: { 'org/apache/flink/util/MutableURLClassLoader' }
flink-main-container Bytecode:
flink-main-container 0000000: 1b99 0012 bb00 1d59 2a2a b600 1eb7 001f
flink-main-container 0000010: a700 042a b0
flink-main-container Stackmap Table:
flink-main-container same_frame(@19)
flink-main-container same_locals_1_stack_item_frame(@20,Object[#84])
flink-main-container
flink-main-container at org.apache.flink.client.ClientUtils.buildUserCodeClassLoader(ClientUtils.java:63)
flink-main-container at org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:145)
flink-main-container at org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:65)
flink-main-container at org.apache.flink.client.program.PackagedProgram$Builder.build(PackagedProgram.java:691)
flink-main-container at org.apache.flink.client.program.DefaultPackagedProgramRetriever.getPackagedProgram(DefaultPackagedProgramRetriever.java:213)
flink-main-container at org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.getPackagedProgram(KubernetesApplicationClusterEntrypoint.java:100)
flink-main-container at org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:70)
However, if I refer the job from /opt/flink
, everything works fine.Keyur Makwana
07/14/2023, 6:25 AMRaghunadh Nittala
07/14/2023, 6:43 AMjava.lang.AssertionError
at org.apache.calcite.rel.metadata.MetadataDef.<init>(MetadataDef.java:48)
at org.apache.calcite.rel.metadata.MetadataDef.of(MetadataDef.java:64)
at org.apache.calcite.rel.metadata.BuiltInMetadata$PercentageOriginalRows.<clinit>(BuiltInMetadata.java:345)
at org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows$RelMdPercentageOriginalRowsHandler.getDef(RelMdPercentageOriginalRows.java:231)
at org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider.reflectiveSource(ReflectiveRelMetadataProvider.java:134)
at org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.<clinit>(RelMdPercentageOriginalRows.java:42)
at org.apache.calcite.rel.metadata.DefaultRelMetadataProvider.<init>(DefaultRelMetadataProvider.java:42)
at org.apache.calcite.rel.metadata.DefaultRelMetadataProvider.<clinit>(DefaultRelMetadataProvider.java:28)
at org.apache.calcite.plan.RelOptCluster.<init>(RelOptCluster.java:97)
at org.apache.calcite.plan.RelOptCluster.create(RelOptCluster.java:106)
at org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$.create(FlinkRelOptClusterFactory.scala:36)
at org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory.create(FlinkRelOptClusterFactory.scala)
at org.apache.flink.table.planner.delegation.PlannerContext.<init>(PlannerContext.java:132)
at org.apache.flink.table.planner.delegation.PlannerBase.<init>(PlannerBase.scala:121)
at org.apache.flink.table.planner.delegation.StreamPlanner.<init>(StreamPlanner.scala:65)
at org.apache.flink.table.planner.delegation.DefaultPlannerFactory.create(DefaultPlannerFactory.java:65)
at org.apache.flink.table.factories.PlannerFactoryUtil.createPlanner(PlannerFactoryUtil.java:58)
at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:127)
at org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:122)
at org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:94)
jaiprasad
07/14/2023, 11:57 AM2023-07-14 10:56:56.204 [flink-akka.actor.default-dispatcher-17] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> Map (7/15) (73095e8cac2266887f4b7a0fc109c199) switched from DEPLOYING to FAILED on 10.233.118.126:43710-2c7cb4 @ 10.233.118.126 (dataPort=34133).
org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException: Could not send message [RemoteRpcInvocation(null.submitTask(TaskDeploymentDescriptor, JobMasterId, Time))] from sender [Actor[<akka://flink/temp/taskmanager_0$+rod]]> to recipient [Actor[<akka.tcp://flink@10.233.118.126:43710/user/rpc/taskmanager_0#-1148004411]]>, because the recipient is unreachable. This can either mean that the recipient has been terminated or that the remote RpcService is currently not reachable.
at org.apache.flink.runtime.rpc.akka.DeadLettersActor.handleDeadLetter(DeadLettersActor.java:61)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
Rajat Ahuja
07/14/2023, 2:11 PMCaused by: org.apache.flink.runtime.rest.util.RestClientException: 413 Request Entity Too Large. Try to raise [rest.client.max-content-length]
at org.apache.flink.runtime.rest.RestClient$ClientHandler.channelRead0(RestClient.java:615) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.shaded.netty4.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[flink-dist-1.17.0.jar:1.17.0]
apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
kind: FlinkDeployment
metadata:
name: session-seven-deployment-only-example
spec:
image: flink:1.17
flinkVersion: v1_17
flinkConfiguration:
taskmanager.numberOfTaskSlots: "10"
rest.server.max-content-length: "1209715200"
rest.client.max-content-length: "1209715200"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
apiVersion: <http://networking.k8s.io/v1|networking.k8s.io/v1>
kind: Ingress
metadata:
annotations:
<http://kubernetes.io/ingress.class|kubernetes.io/ingress.class>: k8-cps-dev
<http://nginx.ingress.kubernetes.io/client-body-buffer-size|nginx.ingress.kubernetes.io/client-body-buffer-size>: 500M
name: my-docker-app-ingress-3
spec:
rules:
- host: <http://flink-sql-gateway.usb.cloud.bank-dns.com|flink-sql-gateway.usb.cloud.bank-dns.com>
http:
paths:
- backend:
service:
name: flink-seven-sql-gateway-svc
port:
number: 8086
path: /
pathType: Prefix
Gateway
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-seven-sql-gateway
spec:
replicas: 1
selector:
matchLabels:
app: flink-seven-sql-gateway
template:
metadata:
labels:
app: flink-seven-sql-gateway
spec:
containers:
- name: flink-seven-sql-gateway
image: flink:1.17.0
ports:
- containerPort: 8086
command: ["/bin/sh", "-c"]
args:
- "/opt/flink/bin/sql-gateway.sh start-foreground -Dsql-gateway.endpoint.rest.address=localhost -Dsql-gateway.endpoint.rest.port=8086"
volumeMounts:
- name: flink-conf
mountPath: /opt/flink/conf
volumes:
- name: flink-conf
configMap:
name: flink-conf
Gateway svc
apiVersion: <http://networking.k8s.io/v1|networking.k8s.io/v1> apiVersion: v1
kind: Service
metadata:
name: flink-seven-sql-gateway-svc
spec:
selector:
app: flink-seven-sql-gateway
ports:
- name: flink-seven-sql-gateway
port: 8086
targetPort: 8086
type: ClusterIP
Ingresses
kind: Ingress
metadata:
annotations:
<http://kubernetes.io/ingress.class|kubernetes.io/ingress.class>: k8-cps-dev
<http://nginx.ingress.kubernetes.io/client-body-buffer-size|nginx.ingress.kubernetes.io/client-body-buffer-size>: 500M
name: ingress-3
spec:
rules:
- host: <http://flink-sql-gateway.usb.cloud.bank-dns.com|flink-sql-gateway.usb.cloud.bank-dns.com>
http:
paths:
- backend:
service:
name: flink-seven-sql-gateway-svc
port:
number: 8086
path: /
pathType: Prefix
apiVersion: <http://networking.k8s.io/v1|networking.k8s.io/v1>
kind: Ingress
metadata:
annotations:
<http://kubernetes.io/ingress.class|kubernetes.io/ingress.class>: k8-cps-dev
<http://nginx.ingress.kubernetes.io/client-body-buffer-size|nginx.ingress.kubernetes.io/client-body-buffer-size>: 500M
name: ingress-2
spec:
rules:
- host: <http://flink-prod-dns.usb.cloud.bank-dns.com|flink-prod-dns.usb.cloud.bank-dns.com>
http:
paths:
- backend:
service:
name: session-seven-deployment-only-example-rest
port:
number: 8081
path: /
pathType: Prefix
Updated flink conf used as configmap
rest.address: <http://flink-prod-dns.usb.cloud.bank-dns.com|flink-prod-dns.usb.cloud.bank-dns.com>
rest.port: 80
jobmanager.bind-host: <http://flink-prod-dns.usb.cloud.bank-dns.com|flink-prod-dns.usb.cloud.bank-dns.com>
rest.server.max-content-length: 1409715200
rest.client.max-content-length: 1409715200
Sylvia Lin
07/14/2023, 5:28 PMDaniel Packard
07/14/2023, 6:21 PMkafka-consumer-groups.sh
cli - I'm getting reports of 0 active consumers 🤔
Is this something I should expect in my metrics/reporting?Dave Sugden
07/14/2023, 6:34 PM1.17.1
, using SQL, we have issues completing checkpoints.
We’ve tried enabled unaligned and not, and its not an issue with timing out, the checkpointing failure happens before the specified checkpoint timeout.
The weird/interesting thing is, even with unaligned checkpoints, we have several operators (JOIN) that are back-pressured but never even acknowledge the checkpoint…
Has anyone witnessed anything similar?
thnx!