Krish Narukulla
09/29/2022, 5:01 AMDan Dubois
09/29/2022, 8:44 AMSourceFunction
-> DiscardingSink
job where the source function produces never ending stream of short random strings. After a variable period of time my Task Manager quits with a 137 error and no stack trace. I can consistently produce this on my Linux machine but not on my Mac.
I thought it might have been because of the unrelenting busy loop in SourceFunction
continually spitting out data but am not sure. I tried using the Source
API with a SingleThreadMultiplexSourceReaderBase
which I thought worked but eventually after several hours got the same issue.
Any advice on what is going on here and how I can mitigate it would be greatly appreciated.Abhinav sharma
09/29/2022, 12:39 PMVincent canuel
09/29/2022, 2:18 PMpodTemplate
but this does not working (when I describe the running pod the env is missing). Here is my deployment :
apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
kind: FlinkDeployment
metadata:
name: flink01
spec:
image: flink01:1
flinkVersion: v1_15
podTemplate:
spec:
containers:
- name: flink-main-container
volumeMounts:
- mountPath: /opt/flink/log
name: flink-logs
env:
- name: MY_CUSTOM_ENV
value: "SUPER"
volumes:
- name: flink-logs
emptyDir: { }
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
job:
jarURI: local:///opt/flink/usrlib/my.jar
parallelism: 2
upgradeMode: stateless
Could you help me on that ?Hartmut
09/29/2022, 4:56 PMRashmin Patel
09/29/2022, 5:01 PM[CL] [WindowOperator] Initializing internalTimerService with wm -9223372036854775808
[CL] [InternalWindowProcessFunction] this.ctx.currentWatermark() at open(): -9223372036854775808
[CL] Opening ProcTimeDeduplicateKeepLastRowFunction
[CL] [ProcTimeDedupeFn] cw -9223372036854775808 for input org.apache.flink.table.data.binary.BinaryRowData@f17bddd6
Can someone help why this could be happening ?Sigh
09/29/2022, 6:21 PMPrasaanth Neelakandan
09/29/2022, 7:19 PM2022-09-23 08:36:15,992 WARN org.apache.hadoop.fs.s3a.S3AInstrumentation [] - Closing output stream statistics while data is still marked as pending upload in OutputStreamStatistics{blocksSubmitted=1, blocksInQueue=1, blocksActive=0, blockUploadsCompleted=0, blockUploadsFailed=0, bytesPendingUpload=9759410, bytesUploaded=0, blocksAllocated=1, blocksReleased=1, blocksActivelyAllocated=0, exceptionsInMultipartFinalize=0, transferDuration=0 ms, queueDuration=0 ms, averageQueueTime=0 ms, totalUploadDuration=0 ms, effectiveBandwidth=0.0 bytes/s}
These have also often been accompanied with another exception: java.util.concurrent.CancellationException: null
The root causes seem to be:
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - VideoQos Stream Reporter (14/40)#0 - asynchronous part of checkpoint 5149 could not be completed.
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Checkpoint 5149 of job 22c1b49deb67c5388647ac9a79bf87ca expired before completing.
Could anyone help understand these exceptions ? And how to solve these checkpoint failures ? will using the S3 presto plugin help ?
Am attaching the detailed stacktraces for these exceptions as well as our Flink config settings.
Any guidance would be appreciated 🙂ding bei
09/30/2022, 5:39 AMkubernetes.rest-service.exposed.type
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
kind: FlinkDeployment
metadata:
name: statemachineflink
spec:
image: flink:1.15
flinkVersion: v1_15
ingress:
template: "xxx"
className: "alb"
flinkConfiguration:
kubernetes.rest-service.exposed.type: "Loadbalancer"
containerized.master.env.ENABLE_BUILT_IN_PLUGINS: "flink-s3-fs-presto-1.15.2.jar"
containerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS: "flink-s3-fs-presto-1.15.2.jar"
taskmanager.numberOfTaskSlots: "2"
state.savepoints.dir: "<s3://xxx>"
state.checkpoints.dir: "<s3://xxx>"
high-availability: "org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory"
high-availability.storageDir: "xxx"
execution.checkpointing.interval: 10s
state.backend: rocksdb
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
podTemplate:
metadata:
labels:
app: statemachineflink
spec:
containers:
- name: flink-main-container
volumeMounts:
- mountPath: /flink-data
name: flink-volume
volumes:
- name: flink-volume
emptyDir: {}
job:
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
parallelism: 2
upgradeMode: savepoint
state: running
savepointTriggerNonce: 0
Aqib Mehmood
09/30/2022, 8:21 AMNick Pocock
09/30/2022, 10:54 AMBalazs Varga
09/30/2022, 11:31 AMyarn-per-job
mode with Zookeeper HA.
The job has gone into a SUSPENDED
state because of a Zookeeper timeout: Client session timed out, have not heard from server in 40019ms for sessionid 0x20a20d1641b67b4
, and the cluster is shut down:
org.apache.flink.runtime.dispatcher.MiniDispatcher [] - Shutting down cluster with state SUSPENDED, jobCancelled: false, executionMode: DETACHED
org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics null.
After it starts up again, it does not find the checkpoint handles:
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Recovering checkpoints from ZooKeeperStateHandleStore{namespace='flink/application_1661494717285_98085/checkpoints/a636bd043052b707aadb1a7b98fe99d5'}.
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Found 0 checkpoints in ZooKeeperStateHandleStore{namespace='flink/application_1661494717285_98085/checkpoints/a636bd043052b707aadb1a7b98fe99d5'}.
I see in the logs of the shut down application that these were called, which seems to have cleared these handles:
org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices [] - Close and clean up all data for ZooKeeperHaServices.
org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices [] - Finished cleaning up the high availability data.
As I understood from browsing jiras (e.g. this one) and the docs, HA data, such as checkpoints, should only be cleaned up on a globally terminal state, which would make a lot of sense.
Can someone help understand why the state cleanup could have happened in my case?Tommy Gunnarsson
09/30/2022, 11:40 AMSync duration
during checkpointing, in the docs it says that sync duration is The duration of the synchronous part of the checkpoint. This includes snapshotting state of the operators and blocks all other activity on the subtask (processing records, firing timers, etc).
which is, our assumption, the time it takes to checkpoint all our state for the operators.
What could be taking time here? We are running our job on k8s on nodes with RocksDB backed by local NVMe drives. For starters, we are trying out with a total state of 100GB.
Here's some numbers and our config:Shen Zhu
09/30/2022, 11:21 PMSubsequently, the accumulate(...) method of the function is called for each input row to update the accumulator. Once all rows have been processed, the getValue(...) method of the function is called to compute and return the final result.
It seems hard to define rows have been processed
in a streaming application, is it possible to have getValue()
method called right after accumulate()
? Basically , generate aggregated results once there's new record coming, thanks!Jirawech Siwawut
10/01/2022, 3:45 AMKrish Narukulla
10/01/2022, 6:32 AMHartmut
10/01/2022, 11:17 AMval env = StreamExecutionEnvironment.getExecutionEnvironment()
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
env.enableCheckpointing(5000)
env.checkpointConfig.setCheckpointStorage("file:///Users/xyz/dev/my-flink-app/checkpoints")
env.stateBackend = EmbeddedRocksDBStateBackend(true)
// ...
env.execute("Stream Join Demo"
Krish Narukulla
10/02/2022, 4:25 AMgs
. Later I have added below to hive-site.yaml.
<property>
<name>fs.gs.impl</name>
<value>org.apache.flink.fs.gs.GSFileSystem</value>
</property>
Later resulted no class found:`org.apache.flink.fs.gs.GSFileSystem`. I have looked at flink cluster jars found, below
root@airstream-f69f865f8-xcqfh:/opt/flink# ls plugins/gs-fs-hadoop/
flink-gs-fs-hadoop-1.15.2.jar
root@airstream-f69f865f8-xcqfh:/opt/flink# ls opt/flink-gs-fs-hadoop-1.15.2.jar
opt/flink-gs-fs-hadoop-1.15.2.jar
Ravi-Dev
10/02/2022, 8:28 AMHartmut
10/03/2022, 12:13 PMRommel
10/03/2022, 5:04 PMflink-kubernetes-operator
follow link here https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.2/docs/operations/helm/
it says that
Error: failed to download "helm/flink-kubernetes-operator" (hint: running `helm repo update` may help)
so which helm repo should i add first?Zsombor Chikan
10/04/2022, 1:10 AMSumit Nekar
10/04/2022, 8:23 AMhelm upgrade --install --wait flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator --set watchNamespaces={ns1,ns2}
I see that flink operator pod not getting restarted after helm upgrade.
Sevvy Yusuf
10/04/2022, 10:52 AMPalani
10/04/2022, 1:10 PMHi All,
We are trying to enrich streaming data with a look up data. Look up data will be kind of static(updated once in a while).
For our streaming application we are using flink table API. To handle this scenario we are trying to use flink temporal join.
We have create two tables for our sources as below (below example is mocked one)
1. Look up data
CREATE TABLE versioned_rates (
currency STRING,
rate DECIMAL(38, 10),
currency_time TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
WATERMARK FOR currency_time AS currency_time,
PRIMARY KEY(currency) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
...)
2. Streaming data (source streaming data)
CREATE TABLE orders (
order_id STRING,
currency STRING,
amount INT,
order_time TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
WATERMARK FOR order_time AS order_time
) WITH (
'connector' = 'kafka',
...)
The join on these tables is like below
SELECT
o.order_id,
o.order_time,
o.amount * r.rate AS amount,
r.rate rate,
r.currency
FROM orders AS o JOIN versioned_rates FOR SYSTEM_TIME AS OF o.order_time r
on o.currency = r.currency;
The issue we are facing is that we are not getting output on this join if there is no update in the look up data.
We need the behaviour like, the data should be written for every record we receive in streaming data even if there is no update in look up data
After doing some studies we understand that this may be related to watermark is not getting matched between streams,
but we couldn't clearly understand why. If we can get any help on this that would be appreciated.
Martijn Visser
10/04/2022, 1:21 PMAll important decisions and conclusions *must be reflected back to the mailing lists.* "If it didn't happen on a mailing list, it didn't happen." - The Apache Mottos
.Ildar Almakaev
10/04/2022, 2:05 PMlogs)
with users
info using Event-Time Temporal join with Table API.
AFAIK, to be able to join those two streams I need to:
1. Define a watermark strategy .withTimestampAssigner(...)
on each source operator (FlinkKafkaConsumer
)
2. When converting a DataStream
to Table
, define a timestamp column and a watermark based on it in a table’s schema
3. Use the log's
timestamp column in Event-Time Temporal Join query
There is below a code snippet for clarity.
Also I attached some screenshots from Flink’s UI...
Issue:
• When I’m running the app and looking at the Flink’s UI dashboard, I can’t see any watermark info for users
info in task level. It says me No Watermark (Watermarks are only available if EventTime is used)
. I wonder why there is no watermark info for the users
pipeline even if I define the watermark strategy.
• logs
users
causes no join between tables.
Could you please help me figure out what there might be wrong?
I would appreciate any help, comments and recommendation 🙂
// Initialize Flink job environment. Some properties are omitted for readability ...
StreamExecutionEnvironment env = ...
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.getConfig().setLocalTimeZone(ZoneId.of("UTC"));
...
/* 1.1 Read 'users' Kafka topic*/
DataStream<UserDto> usersStream = env
.addSource(new CustomFlinkKafkaConsumer<>(...))
.uid("source-users").name("Read Users")
.map(UserRecordMapper::mapToUserDto)
.uid("map-to-user-model")
.keyBy(UserDto::getUId);
// 1.2 Datastream to Table
Table tUsers = tableEnv.fromDataStream(usersStream, Schema.newBuilder()
.primaryKey("u_id")
.columnByExpression("lastModifiedTs2", "TO_TIMESTAMP_LTZ(lastModifiedTs, 3)")
.watermark("lastModifiedTs2", "lastModifiedTs2 - interval '1' day")
.build());
tUsers.printSchema();
tableEnv.createTemporaryView("users", tUsers);
Consume logs
records from Kafka topic
/* 2.1) Read 'logs' Kafka topic*/
DataStream<LoggingAudit> logAuditStream = env
.addSource(new CustomFlinkKafkaConsumer<>(...))
.uid("source-logs").name("Read Logs")
.filter(l -> Objects.nonNull(l.getUserId()))
.keyBy(LoggingAudit::getUserId);
DataStream<LoggingAuditDto> transformedLogAudit = logAuditStream
.map(new LogAuditTransformer())
.uid("transform-log-audit").name("Transform Log Audit");
// 2.2) Datastream to Table
Schema logAuditTableSchema = Schema.newBuilder()
.columnByExpression("insertDateTs2", "TO_TIMESTAMP_LTZ(insertDateTs, 3)")
.watermark("insertDateTs2", "insertDateTs2 - interval '1' day")
.build();
Table logAuditTable = tableEnv.fromDataStream(transformedLogAudit, logAuditTableSchema);
logAuditTable.printSchema();
tableEnv.createTemporaryView("logs", logAuditTable);
3. Enrich logs with users data using Event Time Temporal Join
tableEnv.sqlQuery("select logs.id, logs.action, users.u_id as userId " +
"from logs " +
"join users FOR SYSTEM_TIME AS OF logs.insertDateTs2 " +
"on logs.userId = users.u_id")
.execute()
.print();
UPD: I updated the code (removed assignTimestampsAndWatermarks
from source operators)Sumit Nekar
10/04/2022, 2:41 PMjobmanager.memory.process.size: "1500m"
taskmanager.memory.process.size: "1500m"
taskmanager.memory.jvm-metaspace.size: "256m"
But I see flink operator is trying to request following TM
Received new TaskManager pod: pipeline-event-dedup-taskmanager-1-10
2022-10-04 143602,610 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Requested worker pipeline-event-dedup-taskmanager-1-10 with resource spec WorkerResourceSpec {cpuCores=1.0, taskHeapSize=270.000mb (283115513 bytes), taskOffHeapSize=0 bytes, networkMemSize=105.200mb (110310196 bytes), managedMemSize=420.800mb (441240787 bytes), numSlots=2
}.
I am not understanding why taskOffHeapSize=0
is being set to 0 bytes here?
Appreciate your help.Nithin kharvi
10/04/2022, 3:19 PM