Jasmin Redzepovic
03/07/2023, 12:06 AM1.15.2
and Flink k8s operator 1.2.0
.
Error: switched from INITIALIZING to FAILED with failure cause: java.lang.NullPointerException: Initial Segment may not be null
I have implemented a Window Top-N job that uses Kafka topic as a source and another Kafka topic as a sink. When running the job locally in the IDE, everything works well and results are outputted to the Kafka sink. However, when I deploy the job to the Flink Kubernetes Operator, I get an Initial Segment may not be null
error.
Detailed stack trace in thread š§µPratyush Sharma
03/07/2023, 2:17 AMINFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - deduplicate (193/360) (ccd789f9a61f9a361926927ae4faa43d) switched from INITIALIZING to FAILED on (dataPort=6121).
[2023-03-03 09:00:03.121693] org.apache.flink.util.SerializedThrowable: Exception while creating StreamOperatorStateContext.
[2023-03-03 09:00:03.121733] at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254) ~[s3_ha_store_deploy.jar:?]
[2023-03-03 09:00:03.121829] at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
[2023-03-03 09:00:03.121869] at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441)
~~~
[2023-03-03 09:00:03.123298] Can't access /1197217.sst: IO error: while stat a file for size: /pay/flink/tmp/job_e3f516c4bdd0854e35d31b90fa9f3296_op_KeyedProcessOperator_62787344f3050171e45f898702868550__193_360__uuid_f62ff828-56a7-4a12-8abb-670f4ccc65ba/db/1197217.sst: No such file or directory
[2023-03-03 09:00:03.123347] Can't access /1197223.sst: IO error: while stat a file for size: /pay/flink/tmp/job_e3f516c4bdd0854e35d31b90fa9f3296_op_KeyedProcessOperator_62787344f3050171e45f898702868550__193_360__uuid_f62ff828-56a7-4a12-8abb-670f4ccc65ba/db/1197223.sst: No such file or directory
[2023-03-03 09:00:03.123394] Can't access /1197237.sst: IO error: while stat a file for size: /pay/flink/tmp/job_e3f516c4bdd0854e35d31b90fa9f3296_op_KeyedProcessOperator_62787344f3050171e45f898702868550__193_360__uuid_f62ff828-56a7-4a12-8abb-670f4ccc65ba/db/1197237.sst: No such file or directory
[2023-03-03 09:00:03.123441] Can't access /1163418.sst: IO error: while stat a file for size: /pay/flink/tmp/job_e3f516c4bdd0854e35d31b90fa9f3296_op_KeyedProcessOperator_62787344f3050171e45f898702868550__193_360__uuid_f62ff828-56a7-4a12-8abb-670f4ccc65ba/db/1163418.sst: No such file or directory
[2023-03-03 09:00:03.123514] Can't access /1118503.sst: IO error: while stat a file for size: /pay/flink/tmp/job_e3f516c4bdd0854e35d31b90fa9f3296_op_KeyedProcessOperator_62787344f3050171e45f898702868550__193_360__uuid_f62ff828-56a7-4a12-8abb-670f4ccc65ba/db/1118503.sst: No such file or directory
Shutdown hook threw the following error:
[2023-03-03 04:50:12.071855] WARN og.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager [] - Exception while deleting local state directory for allocation id fa09ca77ce735c8e7383rbe61abbf07.
[2023-03-03 04:50:12.071921] java.nio.file.DirectoryNotEmptyException: /pay/flink/taskmanager-state/localState/aid_fa09ca77ce73995c8e7383be61abbf07/jid_fa98b0d9e39594b63269faf0fe829f/vtx_62787344f3050171e45f8987068550_sti_192/chk_71201
[2023-03-03 04:50:12.071972] at sun.nio.fs.UnixFileSystemProvider.implDelete(UnixFileSystemProvider.java:242) ~[?:1.8.0_352]
[2023-03-03 04:50:12.072021] at sun.nio.fs.AbstractFileSystemProvider.deleteIfExists(AbstractFileSystemProvider.java:108) ~[?:1.8.0_352]
[2023-03-03 04:50:12.072073] at java.nio.file.Files.deleteIfExists(Files.java:1165) ~[?:1.8.0_352]
[2023-03-03 04:50:12.072110] at org.apache.flink.util.FileUtils.deleteDirectoryInternal(FileUtils.java:332) ~[?]
[2023-03-03 04:50:12.072167] at org.apache.flink.util.FileUtils.deleteFileOrDirectoryInternal(FileUtils.java:308) ~[?]
[2023-03-03 04:50:12.072223] at org.apache.flink.util.FileUtils.guardIfNotThreadSafe(FileUtils.java:383) ~[?]
[2023-03-03 04:50:12.072270] at org.apache.flink.util.FileUtils.deleteFileOrDirectory(FileUtils.java:247) ~[?]
[2023-03-03 04:50:12.072330] at org.apache.flink.util.FileUtils.cleanDirectoryInternal(FileUtils.java:361) ~[?]
[2023-03-03 04:50:12.072377] at org.apache.flink.util.FileUtils.deleteDirectoryInternal(FileUtils.java:322) ~[?]
[2023-03-03 04:50:12.072415] at org.apache.flink.util.FileUtils.deleteFileOrDirectoryInternal(FileUtils.java:308) ~[?]
[2023-03-03 04:50:12.072454] at org.apache.flink.util.FileUtils.guardIfNotThreadSafe(FileUtils.java:383) ~[?]
[2023-03-03 04:50:12.072491] at org.apache.flink.util.FileUtils.deleteFileOrDirectory(FileUtils.java:247) ~[?]
[2023-03-03 04:50:12.072531] at org.apache.flink.util.FileUtils.cleanDirectoryInternal(FileUtils.java:361) ~[?]
[2023-03-03 04:50:12.072569] at org.apache.flink.util.FileUtils.deleteDirectoryInternal(FileUtils.java:322) ~[?]
[2023-03-03 04:50:12.072614] at org.apache.flink.util.FileUtils.deleteFileOrDirectoryInternal(FileUtils.java:308) ~[?]
[2023-03-03 04:50:12.072654] at org.apache.flink.util.FileUtils.guardIfNotThreadSafe(FileUtils.java:383) ~[?]
[2023-03-03 04:50:12.072703] at org.apache.flink.util.FileUtils.deleteFileOrDirectory(FileUtils.java:247) ~[?]
[2023-03-03 04:50:12.072752] at org.apache.flink.util.FileUtils.cleanDirectoryInternal(FileUtils.java:361) ~[?]
[2023-03-03 04:50:12.072798] at org.apache.flink.util.FileUtils.deleteDirectoryInternal(FileUtils.java:322) ~[?]
[2023-03-03 04:50:12.072842] at org.apache.flink.util.FileUtils.deleteFileOrDirectoryInternal(FileUtils.java:308) ~[?]
[2023-03-03 04:50:12.072889] at org.apache.flink.util.FileUtils.guardIfNotThreadSafe(FileUtils.java:383) ~[?]
[2023-03-03 04:50:12.072924] at org.apache.flink.util.FileUtils.deleteFileOrDirectory(FileUtils.java:247) ~[?]
[2023-03-03 04:50:12.072970] at org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager.cleanupAllocationBaseDirs(TaskExecutorLocalStateStoresManager.java:289) ~[?]
[2023-03-03 04:50:12.073012] at org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager.shutdown(TaskExecutorLocalStateStoresManager.java:237) ~[?]
[2023-03-03 04:50:12.073049] at org.apache.flink.util.ShutdownHookUtil.lambda$addShutdownHook$0(ShutdownHookUtil.java:39) ~[?]
[2023-03-03 04:50:12.073081] at java.lang.Thread.run(Thread.java:750) [?:1.8.0_352]
kingsathurthi
03/07/2023, 4:11 AMchunilal kukreja
03/07/2023, 4:32 AMLauri SuurvƤli
03/07/2023, 7:37 AMYang LI
03/07/2023, 9:10 AMDima Sheludko
03/07/2023, 3:47 PMs3.access-key: your-access-key
s3.secret-key: your-secret-key
i have serviceAccount and has configured iam, but with default configuration receive 403 error
currently i am trying to use hive.s3.*
options to configure it, but still working on it and don`t sure it will helpVishnu Prashaanth Penu Sathiyamoorthy
03/07/2023, 4:55 PMJeesmon Jacob
03/07/2023, 6:55 PMSuperskyyy
03/07/2023, 9:12 PMSergio Sainz
03/07/2023, 10:24 PMSergio Sainz
03/07/2023, 10:34 PMSergio Sainz
03/07/2023, 10:36 PMRICHARD JOY
03/08/2023, 12:48 AMsharad mishra
03/08/2023, 1:23 AMGerald
03/08/2023, 9:55 AMCREATE TABLE Person (
Id int,
Firstname string,
Lastname string,
DateOfBirth date
) WITH (
'connector' = 'kafka',
'topic' = 'cdc.FlinkResearch.dbo.Person',
...
'format' = 'debezium-avro-confluent',
'debezium-avro-confluent.url' = '...'
);
CREATE TABLE Address (
Id int,
Street string,
ZipCode string,
City string,
Country string,
PersonId int
) WITH (
'connector' = 'kafka',
'topic' = 'cdc.FlinkResearch.dbo.Address',
...
'format' = 'debezium-avro-confluent',
'debezium-avro-confluent.url' = '...'
);
CREATE TABLE PersonUpdated (
Id int,
Firstname string,
Lastname string,
DateOfBirth date,
Addresses ARRAY<ROW(Street string, City string, ZipCode string, Country string)>,
primary key (Id) not enforced
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'cdc.FlinkResearch.dbo.PersonUpdated',
...
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = '...',
'key.format' = 'avro-confluent',
'key.avro-confluent.url' = '...'
);
The idea is to create PersonUpdated entries for every person including the person's addresses as a list. We have tried various ways of casts in our (insert) queries, but still no success:
select p.Id, p.Firstname, p.Lastname, p.DateOfBirth, cast((select collect(row(Street, City, ZipCode, Country)) from Address where PersonId = p.Id) as ARRAY<ROW<Street string, City string, ZipCode string, Country string>>) as Addresses from Person p;
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Cast function cannot convert value of type RecordType(VARCHAR(2147483647) EXPR$0, VARCHAR(2147483647) EXPR$1, VARCHAR(2147483647) EXPR$2, VARCHAR(2147483647) EXPR$3) MULTISET to type RecordType(VARCHAR(2147483647) Street, VARCHAR(2147483647) City, VARCHAR(2147483647) ZipCode, VARCHAR(2147483647) Country) ARRAY
Even more specific casts couldn't solve the issue:
select p.Id, p.Firstname, p.Lastname, p.DateOfBirth, cast((select collect(cast(row(Street, City, ZipCode, Country) as ROW<Street string, City string, ZipCode string, Country string>)) from Address where PersonId = p.Id) as ARRAY<ROW<Street string, City string, ZipCode string, Country string>>) as Addresses from Person p;
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Cast function cannot convert value of type RecordType(VARCHAR(2147483647) Street, VARCHAR(2147483647) City, VARCHAR(2147483647) ZipCode, VARCHAR(2147483647) Country) MULTISET to type RecordType(VARCHAR(2147483647) Street, VARCHAR(2147483647) City, VARCHAR(2147483647) ZipCode, VARCHAR(2147483647) Country) ARRAY
Is there a way to cast nested multisets to an array with plain Flink SQL? Or are there alternatives to the collect
function that would avoid casting at all?Ari Huttunen
03/08/2023, 10:15 AMsave_sample_data = f"""
CREATE TABLE save_sample_data
PARTITIONED BY (EVENT_DAY, EVENT_HOUR)
WITH (
'connector' = 'filesystem',
'path' = '{EGRESS_S3_PATH_FOR_SAMPLES}',
'format' = 'parquet'
)
AS SELECT * FROM input_data
WHERE MINUTE(ts) = 1 AND HOUR(ts) = 10;
"""
table_env.execute_sql(save_sample_data)
I tried to put the PARTITIONED.. at every location. š
I also would have wanted to sample a percentage of the input data.Yuval Itzchakov
03/08/2023, 11:16 AMYang LI
03/08/2023, 1:36 PMMali
03/08/2023, 3:12 PMapiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
kind: FlinkDeployment
metadata:
name: test
namespace: flink
spec:
ingress:
template: "{{name}}.<http://test.com|test.com>"
className: "nginx"
annotations: {}
image: <private pyflink repo>
imagePullPolicy: "Always"
flinkVersion: v1_16
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
serviceAccount: flink
jobManager:
podTemplate:
apiVersion: v1
kind: Pod
metadata:
name: pod-template
spec:
serviceAccount: flink
containers:
# Do not change the main container name
- name: flink-main-container
envFrom:
- secretRef:
name: flink-secret
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
replicas: 1
---
apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
kind: FlinkSessionJob
metadata:
name: rabbitmq-to-kafka
namespace: flink
spec:
deploymentName: test
job:
jarURI: local:///opt/flink/lib/flink-sql-connector-rabbitmq-1.16.0.jar;local:///opt/flink/lib/flink-sql-connector-kafka-1.16.0.jar
entryClass: "org.apache.flink.client.python.PythonDriver"
args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/rabbitmq_to_kafka.py"]
parallelism: 4
upgradeMode: stateless
---
apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
kind: FlinkSessionJob
metadata:
name: kafka-to-s3-raw
namespace: flink
spec:
deploymentName: test
job:
jarURI: local:///opt/flink/lib/flink-sql-connector-rabbitmq-1.16.0.jar;local:///opt/flink/lib/flink-sql-connector-kafka-1.16.0.jar
entryClass: "org.apache.flink.client.python.PythonDriver"
args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/kafka_to_s3_raw.py"]
parallelism: 4
upgradeMode: stateless
I am getting ā resource id: ResourceID{name=ākafka-to-s3-rawā, namespace=āflinkā}, version: 6850788} failed.
org.apache.flink.kubernetes.operator.exception.ReconciliationException: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme ālocalā. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. For a full list of supported file systems, please see https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.ā
It is working well when i put jobURI inside of deployment. I tried different alternatives but didnāt figure it out. Can you help me about this error ?
Many thanks.sharad mishra
03/08/2023, 3:26 PMconf
folder for Flink and I can see my taskmanager.log
are giving output in expected format.
However logs which are re-directed to taskmanager.err
are missing timestamp
field and not in expected format.
e.g. one of taskmanager.err file(missing timestamp):
[main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: parallelism.default, 4
[main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.numberOfTaskSlots, 2
[main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: cluster.evenly-spread-out-slots, false
[main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: yarn.application.name, DCN-LogParser-Workflow
[main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: yarn.application.queue, default
[main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: $internal.application.main, ca.ix.dcn.job.JobRunner
[main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.memory.process.size, 8192m
[main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: metrics.reporter.prom.port, 9250-9260
[main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: execution.savepoint-restore-mode, NO_CLAIM
[main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: execution.savepoint.path, <hdfs://blackhole-auditlog/data/dcn-log-parser/checkpointing/073f246a185b81d373d218cd80abefd5/chk-762>
Here is the configuration for log4j file, that Iām using:
#Define root logger options
log4j.rootLogger=INFO, file, console
#Define console appender
log4j.appender.console=org.apache.log4j.ConsoleAppender
logrj.appender.console.Target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
#Define rolling file appender
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=${log.file}
log4j.appender.file.Append=true
log4j.appender.file.ImmediateFlush=true
log4j.appender.file.MaxFileSize=10MB
log4j.appender.file.MaxBackupIndex=5
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
What could be causing this ?Amir Hossein Sharifzadeh
03/08/2023, 3:28 PMAri Huttunen
03/08/2023, 6:38 PM@udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
def median_udaf(v):
return v.median()
unfortunately it doesn't work due to
Pandas UDAFs are not supported in streaming mode currently.
Please check the documentation for the set of currently supported SQL features.
What can I do? We currently use 1.16, but I figure 1.17 is just around the corner so that could be used as well.Amir Hossein Sharifzadeh
03/09/2023, 12:33 AMStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
I get this error: Exception in thread āmainā org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath
at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.lookupExecutor(AbstractStreamTableEnvironmentImpl.java:109)
at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:101)
at org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:122)
at org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:94)
my pom.xml also contains:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
where:
<properties>
<flink.version>1.16.0</flink.version>
ęå®čŖ
03/09/2023, 1:12 AMZhiyu Tian
03/09/2023, 2:34 AMsziwei
03/09/2023, 3:49 AMsziwei
03/09/2023, 3:51 AMSlackbot
03/09/2023, 7:10 AMkingsathurthi
03/09/2023, 7:45 AM