Abolfazl Ghahremani
03/27/2023, 8:56 AMMichael Helmling
03/27/2023, 9:09 AMjava.lang.ArrayIndexOutOfBoundsException: Index -2147483648 out of bounds for length 5
at org.apache.flink.streaming.runtime.watermarkstatus.HeapPriorityQueue.removeInternal(HeapPriorityQueue.java:155)
at org.apache.flink.streaming.runtime.watermarkstatus.HeapPriorityQueue.remove(HeapPriorityQueue.java:100)
at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve$InputChannelStatus.removeFrom(StatusWatermarkValve.java:300)
at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve$InputChannelStatus.access$200(StatusWatermarkValve.java:266)
at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.markWatermarkUnaligned(StatusWatermarkValve.java:222)
at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.inputWatermarkStatus(StatusWatermarkValve.java:140)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:153)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.base/java.lang.Thread.run(Unknown Source)
This is a simple job (one Kafka input, one Kafka output, stateless) with watermark alignment disabled. Is this a bug?Tsering
03/27/2023, 10:15 AMZhiyu Tian
03/27/2023, 11:00 AMapiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
kind: FlinkDeployment
metadata:
name: statemachine
labels:
type: flink-native-kubernetes
spec:
image: zhiyut/statemachine:2.0
# image: flink:1.16
flinkVersion: v1_16
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
podTemplate:
apiVersion: v1
kind: Pod
metadata:
name: pod-template
spec:
containers:
# Do not change the main container name
- name: flink-main-container
volumeMounts:
- mountPath: /opt/flink/log
name: flink-logs
env:
- name: MT_TOKEN
value: <###>
volumes:
- name: flink-logs
emptyDir: { }
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
job:
jarURI: local:///opt/flink/usrlib/StateMachineExample.jar
entryClass: org.apache.flink.streaming.examples.statemachine.StateMachineExample
parallelism: 2
upgradeMode: stateless
Jashwanth S J
03/27/2023, 11:37 AM高志翔
03/27/2023, 12:29 PMChristophe Bornet
03/27/2023, 12:54 PMflink-avro
deserializes to SQL Tables using the position in the avro record.
Since with avro we have the schema and the field names, is there a possibility to map by field names instead ?
The current converter leads to issues when the schema evolves.Tsering
03/27/2023, 3:33 PMAbolfazl Ghahremani
03/27/2023, 7:13 PMcraig fan
03/28/2023, 3:52 AMorg.apache.flink.runtime.io.network.partition.PartitionNotFoundException: Partition 512d1b5e6860de484fc7792beb466764#0@0b1dc40d062b680b0258b6b4cb90041d not found.
at org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:70)
at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:135)
at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel$1.run(LocalInputChannel.java:185)
at java.base/java.util.TimerThread.mainLoop(Unknown Source)
at java.base/java.util.TimerThread.run(Unknown Source)
My flink job is on 1.15Deepyaman Datta
03/28/2023, 4:54 AMpyiceberg
, and I have a configuration file like:
# ~/.pyiceberg.yaml
catalog:
default:
uri: <http://localhost:8189>
s3.endpoint: <http://localhost:9100>
s3.access-key-id: admin
s3.secret-access-key: password
I actually wanted to use environment variables instead, but is it possible to have the S3-related keys using the pyiceberg
environment variable syntax?Ashwin Kolhatkar
03/28/2023, 6:07 AMFROM apache/flink:1.16.0-scala_2.12
RUN mkdir -p /opt/flink/plugins/flink-s3-fs-hadoop
RUN mv /opt/flink/opt/flink-s3-fs-hadoop-*.jar /opt/flink/plugins/flink-s3-fs-hadoop/.
When I try to submit a job (using the new delta lake read connector) - here is some of the code to get an idea:
DeltaSource<RowData> deviceDeltaTableStream = DeltaSource
.forContinuousRowData(
new Path("<s3a://my-delta-table-location>"),
new Configuration())
.columnNames("id", "state")
.build();
Since the delta lake file is stored in s3 - I need to be able to access it from there. But when I submit this job - I get this error
org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application.
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:113)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Could not execute application.
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
... 2 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Could not execute application.
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:88)
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:107)
... 2 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98)
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
... 4 more
Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2592)
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3320)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3352)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3403)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3371)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:477)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
at io.delta.standalone.internal.DeltaLogImpl$.apply(DeltaLogImpl.scala:260)
at io.delta.standalone.internal.DeltaLogImpl$.forTable(DeltaLogImpl.scala:241)
at io.delta.standalone.internal.DeltaLogImpl.forTable(DeltaLogImpl.scala)
at io.delta.standalone.DeltaLog.forTable(DeltaLog.java:164)
What could be going wrong here?
Also, attaching jobmanager logs in thread.Jasper Dunker
03/28/2023, 7:46 AMYubin Li
03/28/2023, 8:04 AMCaused by: org.apache.flink.util.FlinkException: An OperatorEvent from an OperatorCoordinator to a task was lost. Triggering task failover to ensure consistency. Event: '[NoMoreSplitEvent]', targetTask: Source: paimon-f2b6dae9-74c2-40bc-895e-a616a551f409.default.ts_table -> Calc(select=[dt, k, v], where=[(dt < _UTF-16LE'2023-01-17')]) -> NotNullEnforcer(fields=[dt, k]) -> TableToDataSteam(type=ROW<`dt` STRING NOT NULL, `k` INT NOT NULL, `v` INT> NOT NULL, rowtime=false) -> Map (1/1) - execution #0
... 33 more
Caused by: org.apache.flink.runtime.operators.coordination.TaskNotRunningException: Task is not running, but in state FINISHED
at org.apache.flink.runtime.taskmanager.Task.deliverOperatorEvent(Task.java:1475)
at org.apache.flink.runtime.taskexecutor.TaskExecutor.sendOperatorEventToTask(TaskExecutor.java:1249)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
... 21 more
Abolfazl Ghahremani
03/28/2023, 8:11 AMMali
03/28/2023, 9:44 AMYemson Rose
03/28/2023, 1:49 PMYemson Rose
03/28/2023, 1:50 PMLiad Shachoach
03/28/2023, 4:01 PMDheeraj Panangat
03/28/2023, 4:59 PMCaused by: java.lang.ClassCastException: class org.apache.flink.table.data.vector.heap.HeapBytesVector cannot be cast to class org.apache.flink.table.data.vector.LongColumnVector (org.apache.flink.table.data.vector.heap.HeapBytesVector and org.apache.flink.table.data.vector.LongColumnVector are in unnamed module of loader 'app')
at org.apache.flink.table.data.vector.VectorizedColumnBatch.getLong(VectorizedColumnBatch.java:86) ~[flink-table_2.12-1.14.6.jar:1.14.6]
Getting this when executing
tableEnv.createStatementSet()
.addInsertSql("stmt1")
.addInsertSql("stmt2")
.addInsertSql("stmt3")
.addInsertSql("stmt4")
.execute();
Vignesh Venkataraman
03/28/2023, 7:04 PMUsers can now leverage the Java API from any Scala version
Im finding it a little hard to understand the above message, please let me know if its possible process using Flink through scala? And i would highly appreciate it if the community could also share some Scala-Flink tutorialsLydian Lee
03/29/2023, 6:38 AMHighAvailabilityServicesFactory
to help me choose which Job manager to use. However, when it comes to Task managers, is there a way to specify only the TM with the same region as the JM will be used to run the job ? ThanksRon Ben Arosh
03/29/2023, 8:14 AMclaim
via flink-conf (Running flink 1.16 as StandaloneApplication, checkpoints are stored in S3 bucket)
Whenever Flink load checkpoints, a new checkpoint dir is created in addition to the existing one - so I think the default mode is used (and not claim)
Any assistance will be appreciated 🙂Mali
03/29/2023, 11:18 AMSlackbot
03/29/2023, 12:07 PMJashwanth S J
03/29/2023, 12:22 PMJashwanth S J
03/29/2023, 12:24 PMHa Long Do
03/29/2023, 2:18 PMflink_taskmanager.1.e7sxy43lsb49@workernode.gcp | 2023-03-29 13:50:34,061 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Start job leader service.
flink_taskmanager.1.e7sxy43lsb49@workernode.gcp | 2023-03-29 13:50:34,066 INFO org.apache.flink.runtime.filecache.FileCache [] - User file cache uses directory /tmp/flink-dist-cache-705fdca5-c285-4875-9b68-556ccd1b56c3
flink_taskmanager.1.e7sxy43lsb49@workernode.gcp | 2023-03-29 13:50:34,073 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Connecting to ResourceManager <akka.tcp://flink@jobmanager:6123/user/rpc/resourcemanager_*(00000000000000000000000000000000)>.
flink_taskmanager.1.e7sxy43lsb49@workernode.gcp | 2023-03-29 13:50:34,420 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Resolved ResourceManager address, beginning registration
flink_taskmanager.1.e7sxy43lsb49@workernode.gcp | 2023-03-29 13:55:34,086 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Fatal error occurred in TaskExecutor <akka.tcp://flink@10.0.1.14:6127/user/rpc/taskmanager_0>.
flink_taskmanager.1.e7sxy43lsb49@workernode.gcp | org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException: Could not register at the ResourceManager within the specified maximum registration duration PT5M. This indicates a problem with this instance. Terminating now.
About firewall rules, these instances are deployed in the same VPC, same subnet, so I suppose it can communicate without any troubles. In fact, I can ping or curl on both instances.
Before deploying to Google Cloud, I have successfully deployed the same setup on my private Microstack cloud without any problem. Here is my docker-compose file that I used `docker stack deploy`:
version: '3.8'
services:
jobmanager:
image: halo93/fixed-ports-flink-docker:1.16.1-scala_2.12-java11-custom
deploy:
replicas: 1
placement:
constraints: [node.hostname == managernode.gcp]
ports:
- "8081:8081"
- "6123:6123"
- "6124:6124"
- "6125:6125"
command: jobmanager
environment:
- FLINK_PROPERTIES=${FLINK_PROPERTIES}
networks:
- flink-network
taskmanager:
image: halo93/fixed-ports-flink-docker:1.16.1-scala_2.12-java11-custom
deploy:
replicas: 1
placement:
constraints: [node.hostname == workernode.gcp]
depends_on:
- jobmanager
ports:
- "6121:6121"
- "6122:6122"
- "6126:6126"
- "6127:6127"
- "6128:6128"
- "5005:5005/udp"
command:
- taskmanager
environment:
- FLINK_PROPERTIES=${FLINK_PROPERTIES}
networks:
- flink-network
networks:
flink-network:
driver: overlay
attachable: true
FLINK_PROPERTIES
FLINK_PROPERTIES=$'\njobmanager.rpc.address: jobmanager\nparallelism.default: 2\n'
I am using a customized flink docker image to fix taskmanager.data.port and taskmanager.rpc.port to 6126 and 6127.
I have tried to change jobmanager.rpc.address with the private IP and zonal DNS, ResourceManager can register the taskmanager. However, by doing so, flink-metrics is unable to work. Does anyone know how to fix this issue? I would be really appreciatedczchen
03/29/2023, 2:44 PMReme Ajayi
03/29/2023, 3:35 PM