Sergio Sainz
03/29/2023, 5:52 PMexecution.checkpointing.externalized-checkpoint-retention: DELETE_ON_CANCELLATION
), but the job-id folder still remains:
[sergio@flink-cluster-54f7fc7c6-k6km8 JobCheckpoints]$ ls
01eff17aa2910484b5aeb644bc531172 3a59309ef018541fc0c20856d0d89855 78ff2344dd7ef89f9fbcc9789fc0cd79 a6fd7cec89c0af78c3353d4a46a7d273 dbc957868c08ebeb100d708bbd057593
04ff0abb9e860fc85f0e39d722367c3c 3e09166341615b1b4786efd6745a05d6 79efc000aa29522f0a9598661f485f67 a8c42bfe158abd78ebcb4adb135de61f dc8e04b02c9d8a1bc04b21d2c8f21f74
05f48019475de40230900230c63cfe89 3f9fb467c9af91ef41d527fe92f9b590 7a6ad7407d7120eda635d71cd843916a a8db748c1d329407405387ac82040be4 dfb2df1c25056e920d41c94b659dcdab
09d30bc0aaaaff786994a6a3bb06abd3 455525b76a1c6826b6eaebd5649c5b6b 7b1458424496baaf3d020e9fece525a4 aa2ef9587b2e9c123744e8940a66a287
All folders in the above list, like 01eff17aa2910484b5aeb644bc531172
, are empty ~ Do you know if there is a setting to delete empty folder?Ananth V
03/29/2023, 5:53 PMArtun Duman
03/29/2023, 6:40 PMkubectl create clusterrolebinding flink-role-binding-default --clusterrole=edit --serviceaccount=default:default
$FLINK_PATH/bin/kubernetes-session.sh -Dkubernetes.cluster-id=flink-session-cluster -Dkubernetes.rest-service.exposed.type=LoadBalancer -Dkubernetes.namespace=default -Drest.address=0.0.0.0
This works fine and creates the cluster on minikube, I can access it if I do minikube tunnel
to access the UI. At this point my services look like this:
âś— kg svc
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
flink-session-cluster ClusterIP None <none> 6123/TCP,6124/TCP 5s
flink-session-cluster-rest LoadBalancer 10.100.202.72 127.0.0.1 8081:32409/TCP 5s
kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 4d18h
I then create the SQL Gateway with the below yaml file:
apiVersion: apps/v1
kind: Deployment
metadata:
name: sql-gateway
spec:
replicas: 1
selector:
matchLabels:
app: sql-gateway
template:
metadata:
labels:
app: sql-gateway
spec:
containers:
- name: sql-gateway
image: flink:1.17.0
ports:
- containerPort: 8083
command: ["/bin/sh", "-c"]
args:
- "/opt/flink/bin/sql-gateway.sh start-foreground"
volumeMounts:
- name: flink-conf
mountPath: /opt/flink/conf/flink-conf.yaml
volumes:
- name: flink-conf
hostPath:
path: /host/flink-refinery/k8s/conf/flink-conf.yaml # minikube stuff
type: File
---
apiVersion: v1
kind: Service
metadata:
name: sql-gateway
spec:
selector:
app: sql-gateway
ports:
- name: sql-gateway
port: 8083
targetPort: 8083
type: ClusterIP
And my flink-conf.yaml
is
jobmanager.rpc.address: flink-session-cluster
jobmanager.rpc.port: 6123
rest.address: flink-session-cluster-rest
rest.port: 8081
sql-gateway.endpoint.rest.address: 0.0.0.0
sql-gateway.endpoint.rest.port: 8083
sql-gateway.endpoint.type: rest
I can confirm from the logs that this config is picked up.
2023-03-29 18:33:38,431 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: rest.port, 8081
2023-03-29 18:33:38,431 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: sql-gateway.endpoint.rest.address, sql-gateway
2023-03-29 18:33:38,431 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: sql-gateway.endpoint.type, rest
2023-03-29 18:33:38,431 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.address, flink-session-cluster
2023-03-29 18:33:38,431 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: sql-gateway.endpoint.rest.port, 8083
2023-03-29 18:33:38,432 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: rest.address, flink-session-cluster-rest
2023-03-29 18:33:38,432 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.port, 6123
When I create a new Flink pod and run the sql client, however, I get an error from SQL Gateway. These are my pods and services at this point:
âś— kg po
NAME READY STATUS RESTARTS AGE
flink-session-cluster-758bc469cc-lgpps 1/1 Running 0 10m
sql-client-74f44556cf-ls24w 1/1 Running 0 2m29s
sql-gateway-7758c8fdf9-nldvt 1/1 Running 0 2m29s
âś— kg svc
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
flink-session-cluster ClusterIP None <none> 6123/TCP,6124/TCP 10m
flink-session-cluster-rest LoadBalancer 10.100.202.72 127.0.0.1 8081:32409/TCP 10m
kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 4d18h
sql-gateway ClusterIP 10.103.121.161 <none> 8083/TCP 2m30s
And I use sql-client pod to connect to the gateway like this:
âś— kubectl exec -i -t sql-client-74f44556cf-ls24w -- /bin/bash
root@sql-client-74f44556cf-5tv5d:/opt/flink# ./bin/sql-client.sh gateway -e sql-gateway:8083
# At this point I get "Session <id> is opened, and the number..." from SQL Gateway logs, so I can connect to the gateway
Flink SQL> SELECT
> name,
> COUNT(*) AS cnt
> FROM
> (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name)
> GROUP BY name;
As soon as I run that test SQL command, I get this error from the gateway's logs:
2023-03-29 18:32:28,748 WARN org.apache.flink.client.program.rest.RestClusterClient [] - Attempt to submit job 'collect' (ccaafce00ee8c95e4b5c037774d885e1) to '<http://0.0.0.0:8081>' has failed.
java.util.concurrent.CompletionException: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /0.0.0.0:8081
at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source) ~[?:?]
at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:480) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578) [flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571) [flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550) [flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491) [flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616) [flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609) [flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117) [flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:321) [flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:337) [flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:776) [flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) [flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) [flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) [flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) [flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [flink-dist-1.17.0.jar:1.17.0]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /0.0.0.0:8081
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:?]
at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source) ~[?:?]
at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:337) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334) ~[flink-dist-1.17.0.jar:1.17.0]
Now I am extremely confused, why it's trying to connect to 0.0.0.0:8081
? I remember I got this to work at some point, but I'm not sure what was different since I was just quickly experimenting.. If anyone has any pointers, I'd really appreciate it! Thanks!Radu Stoian
03/29/2023, 6:58 PMmyValueState.value()
, inside a KeyedProcessFunction
, sometimes returns null
despite the fact that the logic in the code should guarantee that the object returned by .value()
is not null. These nulls are returned rarely, and do not occur again when the app is restarted and run on the same data that it previously failed on. Note: myValueState
is of type ValueState<java.time.LocalDateTime>
.
More Context
• I am using Flink 1.15.2, hosted on AWS Kinesis Data Analytics; this is where the error occurs
• The error does not occur locally
• RocksDB is used as the state store backend on AWS Kinesis Data Analytics
Code
• Near the top of my process function, I run the first function for every single event to update my minTimestamp value state
• This function, to my mind, should ensure that the value of this value state should never be null
• Later on in the code I call minTimestamp.value()
, which will return null once in a while
void updateMinTimestamp(LocalDateTime newTimestamp) {
try {
final LocalDateTime currentMinTimestamp = minTimestamp.value();
if (currentMinTimestamp == null || newTimestamp.isBefore(currentMinTimestamp)) {
minTimestamp.update(newTimestamp);
}
} catch (Exception e) {
throw new FlinkStateAccessException("failed", e);
}
}
Any information regarding why this error is occurring, and how to prevent it, is much appreciatedGaurav Miglani
03/29/2023, 7:09 PMAvroSchemaConverter.convertToTypeInfo[Row]
for converting it into flink type info, somehow below is the table DDL is getting generated which is failing at flink sql validation step
CREATE TABLE t1 (
`pp` MAP<STRING, RAW('java.lang.Object', '...')>
) with (
'connector' = 'kafka',
'topic' = 'test',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'test',
'properties.auto.offset.reset' = 'latest',
'value.format' = 'avro-confluent',
'scan.topic-partition-discovery.interval' = '60000',
'value.avro-confluent.url' = 'localhost:9091',
'scan.startup.mode' = 'group-offsets'
)
error
Caused by: org.apache.flink.table.api.ValidationException: Unable to restore the RAW type of class 'java.lang.Object' with serializer snapshot '...'.
at org.apache.flink.table.types.logical.RawType.restore(RawType.java:157)
at org.apache.flink.table.planner.calcite.FlinkTypeFactory.createRawType(FlinkTypeFactory.scala:371)
at org.apache.flink.sql.parser.type.SqlRawTypeNameSpec.deriveType(SqlRawTypeNameSpec.java:60)
at org.apache.calcite.sql.SqlDataTypeSpec.deriveType(SqlDataTypeSpec.java:231)
at org.apache.calcite.sql.SqlDataTypeSpec.deriveType(SqlDataTypeSpec.java:218)
at org.apache.flink.sql.parser.type.SqlMapTypeNameSpec.deriveType(SqlMapTypeNameSpec.java:67)
at org.apache.calcite.sql.SqlDataTypeSpec.deriveType(SqlDataTypeSpec.java:231)
at org.apache.flink.table.planner.operations.MergeTableLikeUtil$SchemaBuilder.collectPhysicalFieldsTypes(MergeTableLikeUtil.java:509)
at org.apache.flink.table.planner.operations.MergeTableLikeUtil$SchemaBuilder.appendDerivedColumns(MergeTableLikeUtil.java:397)
at org.apache.flink.table.planner.operations.MergeTableLikeUtil$SchemaBuilder.access$000(MergeTableLikeUtil.java:209)
at org.apache.flink.table.planner.operations.MergeTableLikeUtil.mergeTables(MergeTableLikeUtil.java:150)
at org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:162)
at org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:77)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:309)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:262)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:723)
Senad Hadžikić
03/30/2023, 12:02 PMKafkaSource
.<...>builder()
.setProperty("<http://partition.discovery.interval.ms|partition.discovery.interval.ms>", "10000")
.setProperty("enable.auto.commit", "true")
.setProperty("<http://auto.commit.interval.ms|auto.commit.interval.ms>", "50")
.setProperty("auto.offset.reset", "latest")
.setProperty("auto.offset.reset.strategy", "latest")
.setProperty("commit.offsets.on.checkpoint", "false")
.setProperty("client.id", "clientFlink6661")
.setPartitions(
new HashSet<>(
Arrays.asList(
new TopicPartition("myTopic", 0),
new TopicPartition("myTopic", 1)
)
)
)
.setBootstrapServers(kafkaAddress)
.setStartingOffsets(OffsetsInitializer.latest())
.setGroupId("FlinkDestroyer6661")
.setValueOnlyDeserializer(new MyClassDeserializationSchema())
.build();
The issue is when I used the
DeliveryGuarantee.EXACTLY_ONCE
in my KafkaSink, everything worked perfectly but when I switched to
DeliveryGuarantee.AT_LEAST_ONCE)
now every time I start the Flink app, it starts from the first message in the topic, like the OffsetsInitializer.latest() doesn't matter at all. I've tried everything like you can see in the provided code. Any ideas what could cause the issue.
Also, even though the flink job consumes messages from the Kafka topic, it shows 0 active consumers and the State is marked as EMPTY.Senad Hadžikić
03/30/2023, 12:04 PMOtto Remse
03/30/2023, 12:26 PMmetrics.reporter.prompushgateway.filter.excludes
setting? Is it the same as metrics.reporter.prompushgateway.scope.variables.exludes: task_attempt_id;task_attempt_num
i.e semi-colon separated list? The documentation just states that it should be a List<String> whereas the scope.variables.excludes is a String?Jashwanth S J
03/30/2023, 12:51 PM❯ kubectl get FlinkSessionJob -n flink
NAME JOB STATUS LIFECYCLE STATE
basic-session-job-example RECONCILING STABLE
basic-session-job-example2 RECONCILING STABLE
Jashwanth S J
03/30/2023, 12:51 PMstatus:
error: '{"type":"org.apache.flink.kubernetes.operator.exception.MissingSessionJobException","message":"Missing
Session Job","throwableList":[]}'
Slackbot
03/30/2023, 1:39 PMYufei Chen
03/30/2023, 3:57 PMLeon Xu
03/30/2023, 4:48 PMIvan Webber
03/30/2023, 7:28 PMGuruguha Marur Sreenivasa
03/30/2023, 9:18 PMCheckpointListener
to listen to completed checkpoints? I'm unable to register this with my stream environment. Can someone please help here?
Below is my listener code:
public class CheckpointCoordinator implements CheckpointListener {
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
<http://log.info|log.info>("Checkpoint {} completed successfully.", checkpointId);
// add custom logic here
}
@Override
public void notifyCheckpointAborted(long checkpointId) throws Exception {
CheckpointListener.super.notifyCheckpointAborted(checkpointId);
}
}
How do I register this?Gil Kirkpatrick
03/30/2023, 9:29 PMCREATE TABLE FOO (id STRING, name STRING)
WITH (
'connector'='kafka',
'topic'='foo',
'format'='json',
'properties.bootstrap.servers'='flinktests-kafka-1:29091',
'properties.group.id'='flink',
'scan.startup.mode'='earliest-offset'
);
I get the following error from the SQL Client
[ERROR] Could not execute SQL statement. Reason:
2023-03-30 15:23:43 java.lang.ClassCastException: class org.codehaus.janino.CompilerFactory cannot be cast to class org.codehaus.commons.compiler.ICompilerFactory (org.codehaus.janino.CompilerFactory is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @4c013fb6; org.codehaus.commons.compiler.ICompilerFactory is in unnamed module of loader 'app')
Any ideas where I should look?Adam Augusta
03/30/2023, 10:01 PMJalil Alchy
03/30/2023, 10:08 PMorg.apache.flink.runtime.checkpoint.CheckpointException: Task has failed.
at org.apache.flink.runtime.messages.checkpoint.SerializedCheckpointException.unwrap(SerializedCheckpointException.java:51)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1013)
at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103)
at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task name with subtask : Source: postgres_cdc (1/1)#19 Failure reason: Task has failed.
at org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1395)
at org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1338)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)
at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$11(StreamTask.java:1094)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
... 1 more
Caused by: java.util.concurrent.CompletionException: java.lang.Exception: Could not perform checkpoint 38 for operator Source: postgres_cdc (1/1)#19.
at java.base/java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:367)
at java.base/java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:376)
at java.base/java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:1019)
... 15 more
Caused by: java.lang.Exception: Could not perform checkpoint 38 for operator Source: postgres_cdc (1/1)#19.
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1138)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$11(StreamTask.java:1085)
... 12 more
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 38 for operator Source: postgres_cdc (1/1)#19. Failure reason: Checkpoint was declined.
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:227)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:212)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:647)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:320)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$12(StreamTask.java:1253)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1241)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1126)
... 13 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Call snapshotState() on closed source, checkpoint failed.
at com.ververica.cdc.debezium.DebeziumSourceFunction.snapshotState(DebeziumSourceFunction.java:307)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:87)
Ivan Webber
03/30/2023, 11:36 PMEcho Lee
03/31/2023, 2:58 AMon a.value > b.value
Is there a mistake in the description on the official website or something wrong with my understanding?Evaldas Buinauskas
03/31/2023, 6:16 AM/// HTTP request latency histogram buckets
const HTTP_REQUESTS_HISTOGRAM_BUCKETS_SECONDS: &[f64; 18] = &[
0.010, 0.020, 0.030, 0.050, 0.060, 0.070, 0.080, 0.090, //
0.100, 0.150, 0.200, 0.250, 0.300, 0.350, 0.400, 0.450, //
0.500, // Timeout limit
0.600, // Catch all
];
lazy_static::lazy_static! {
/// Tracks request histogram
pub static ref HTTP_REQ_HISTOGRAM: HistogramVec = register_histogram_vec!(
"http_request_duration_seconds",
"The HTTP request latencies in seconds.",
&[HANDLER_LABEL, PORTAL_LABEL, QUERY_LABEL],
HTTP_REQUESTS_HISTOGRAM_BUCKETS_SECONDS.to_vec()
)
.expect("valid histogram vec metric");
}
🙏Raghunadh Nittala
03/31/2023, 11:03 AMResultProcessor
, I’m using Table API and executing few SQL queries on a Kafka stream using executeSql()
method. In another class called ResultExecutor
, I’m using DataStream to process another Kafka stream, using env.execute()
method.
Now, in the Main class, I’m calling ResultExecutor
first and then ResultProcessor
. While submitting the job, I’m getting an exception org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Cannot have more than one execute() or executeAsync() call in a single environment
.
When I reverse the order, I don’t see the exception, but only one stream is getting executed. When I comment out the first one, the second one is working fine and I’m able to see the operators in the job overview (and viceversa).
Is there any restriction on the environment to execute TableAPI/DataStream alone? I’m using Flink 1.16 version. 🙏Ricardo Correia
03/31/2023, 5:00 PMFROM apache/flink-kubernetes-operator:1.4.0
ENV FLINK_PLUGINS_DIR=/opt/flink/plugins
COPY flink-s3-fs-hadoop-1.15.4.jar $FLINK_PLUGINS_DIR/s3-fs-hadoop/
COPY flink-s3-fs-presto-1.15.4.jar $FLINK_PLUGINS_DIR/s3-fs-presto/
And I can confirm that the pods of the operator contain these plugins.
The problem is that when I try to reference the JAR file that is in the S3 bucket:
job:
jarURI: '<s3://mybucket/flink-application-15.jar>'
In standalone deployment I get the following error in the JM:
Caused by: java.net.MalformedURLException: unknown protocol: s3
In a session job deployment I get the following error in the deployment:
The LocalStreamEnvironment cannot be used when submitting a program through a client, or running in a TestEnvironment context.
Has anyone faced a similar issue or knows why I’m getting this error?
🙏Varun Sayal
03/31/2023, 5:48 PMVarun Sayal
03/31/2023, 5:49 PMQueryable state is currently not supported with TTL
Varun Sayal
03/31/2023, 5:49 PMJoris Basiglio
03/31/2023, 8:24 PMRocksDBOptionsFactory
) to test things out, but I don't really see anything happening on my state after mutliple days of it runningNathanael England
03/31/2023, 8:31 PMKeyedCoProcessFunction
in pyflink 1.16.0. When I call ctx.get_current_key()
like the docs show, I just get None
back instead of my key. Does this work for anyone else?Amir Hossein Sharifzadeh
04/01/2023, 8:19 PMCaused by: java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/functions/ProcessFunction
Any idea how to solve this issue? And yes, my pom.xml
contains all required dependencies:
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.11.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<!-- Adding jackson dependencies. They must be in the default scope (compile). -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.databind.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>
<!-- jackson needs jackson-datatype-jsr310 for Java 8 java.time.Instant support -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
where
<properties>
<flink.version>1.17.0</flink.version>
<jackson.databind.version>2.14.0</jackson.databind.version>
<jackson.version>2.14.0</jackson.version>
<junit.jupiter.version>5.9.1</junit.jupiter.version>
<kafka.version>3.2.2</kafka.version>
<log4j.version>2.19.0</log4j.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<target.java.version>11</target.java.version>
</properties>
Nancy Yang
04/02/2023, 1:11 AMCaused by: java.nio.file.NoSuchFileException: /tmp/pyflink/3e67a85b-296d-4586-aa5b-b654963e6464/7fc3a4b9-0c8f-4d6a-9178-8cff5e5c58e6/word_count.py
I looked at the PythonDriver.java codes, seems it should either create a soft link for the original word_count.py file under the /tmp/pyflink/xxx/yyy/word.py or copy the file over to the /tmp directory. I don't understand why it complaint.
Also the weird thing is if I changed the codes to do different things in python_demo.py. the deployment still succeeded and ran the same stream job. Looks to me the pod was running something from pod template, not from my python-example.yaml.
Has anybody met the same issue? Or I missed something? Thanks in advance!! gratitude thank you
python-example.yaml
apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
kind: FlinkDeployment
metadata:
name: python-demo
namespace: flink-operator
spec:
image: registry.localhost:5000/flink-python-demo:latest
flinkVersion: v1_16
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
job:
jarURI: local:///opt/flink/opt/flink-python-1.16.1.jar # Note, this jarURI is actually a placeholder
entryClass: "org.apache.flink.client.python.PythonDriver"
args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/word_count.py"]
parallelism: 1
upgradeMode: stateless