licho
04/10/2023, 8:11 AMSameer Chandra
04/10/2023, 8:39 AMBharath Reddy
04/10/2023, 9:15 AMpiby 180
04/10/2023, 11:25 AMfrom pyflink.table import EnvironmentSettings, TableEnvironment
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
source_ddl = f"""
CREATE TABLE source_table (
user_id STRING,
order_amount DOUBLE,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10'
)
"""
sink_ddl = f"""
CREATE TABLE sink_table (
user_id STRING,
order_amount DOUBLE,
ts TIMESTAMP(3),
dt STRING
) PARTITIONED BY (dt) WITH (
'connector' = 'filesystem',
'path' = 's3a://<bucket-name>/flink/data',
'format' = 'parquet',
'auto-compaction' = 'true',
'partition.time-extractor.timestamp-pattern' = '$dt',
'sink.rolling-policy.file-size'='1MB',
'sink.rolling-policy.rollover-interval'='60s',
'sink.partition-commit.delay'='0s',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.policy.kind'='success-file'
);
"""
t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
statement_set = t_env.create_statement_set()
statement_set.add_insert_sql("INSERT INTO sink_table SELECT user_id, order_amount, ts, DATE_FORMAT(ts, 'yyyy/MM/dd/HH/mm') as dt FROM source_table")
statement_set.execute().wait()
The code is running indefinitely but there are no files on S3, Here are the logs
2023-04-10 11:20:45,212 INFO org.apache.hadoop.io.compress.CodecPool [] - Got brand-new compressor [.snappy]
2023-04-10 11:20:45,211 INFO org.apache.hadoop.io.compress.CodecPool [] - Got brand-new compressor [.snappy]
2023-04-10 11:21:01,046 INFO org.apache.hadoop.io.compress.CodecPool [] - Got brand-new compressor [.snappy]
2023-04-10 11:21:01,047 INFO org.apache.hadoop.io.compress.CodecPool [] - Got brand-new compressor [.snappy]
2023-04-10 11:22:00,964 INFO org.apache.hadoop.io.compress.CodecPool [] - Got brand-new compressor [.snappy]
2023-04-10 11:22:00,999 INFO org.apache.hadoop.io.compress.CodecPool [] - Got brand-new compressor [.snappy]
mohammadreza khedri
04/10/2023, 11:43 AMraise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute.
: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
.
.
.
Caused by: java.lang.RuntimeException: Failed to create stage bundle factory! INFO:root:Initializing Python harness
This is my question on the StackOverflow website:
https://stackoverflow.com/questions/75976664/reading-from-kafka-with-pyflink-not-workingpiby 180
04/10/2023, 2:00 PM'sink.partition-commit.trigger'='partition-time'.
`Caused by: java.time.format.DateTimeParseException: Text '2023/04/10/13/49' could not be parsed at index 4`Looks like it is not able to commit the partition. When I use 'sink.partition-commit.trigger'='process-time',
it works and I can see _SUCCESS empty files being commited.
4. Is it better to use DataStream API for S3 Sink? From the documentation, it looks like data stream API has the ability to write file prefix and suffix
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/
Here is my code:
from pyflink.table import EnvironmentSettings, TableEnvironment
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().set("restart-strategy.type", "fixed-delay")
t_env.get_config().set("restart-strategy.fixed-delay.attempts", "3")
t_env.get_config().set("restart-strategy.fixed-delay.delay", "30s")
jar_list = """
file:///home/ubuntu/environment/flink/lib/flink-sql-connector-kafka-1.17.0.jar;
file:///home/ubuntu/environment/flink/lib/flink-sql-parquet-1.17.0.jar;
file:///home/ubuntu/environment/flink/lib/flink-connector-files-1.17.0.jar;
file:///home/ubuntu/environment/flink/lib/flink-s3-fs-hadoop-1.17.0.jar;
file:///home/ubuntu/environment/flink/lib/flink-s3-fs-presto-1.17.0.jar;
file:///home/ubuntu/environment/flink/lib/hadoop-mapreduce-client-core-3.3.5.jar
"""
t_env.get_config().set("pipeline.jars", jar_list)
t_env.get_config().set("pipeline.classpaths", jar_list)
# set the checkpoint mode to EXACTLY_ONCE
t_env.get_config().set("execution.checkpointing.mode", "EXACTLY_ONCE")
t_env.get_config().set("execution.checkpointing.interval", "1min")
# set the statebackend type to "rocksdb", other available options are "filesystem" and "jobmanager"
# you can also set the full qualified Java class name of the StateBackendFactory to this option
# e.g. org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
t_env.get_config().set("state.backend.type", "rocksdb")
# set the checkpoint directory, which is required by the RocksDB statebackend
t_env.get_config().set("state.checkpoints.dir", "file:///tmp/checkpoints/")
source_ddl = f"""
CREATE TABLE source_table (
user_id STRING,
order_amount DOUBLE,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10'
)
"""
sink_ddl = f"""
CREATE TABLE sink_table (
user_id STRING,
order_amount DOUBLE,
ts TIMESTAMP(3),
dt VARCHAR
) PARTITIONED BY (dt) WITH (
'connector' = 'filesystem',
'path' = 's3a://<bucket_name>/flink/data',
'format' = 'parquet',
'auto-compaction' = 'true',
'sink.rolling-policy.file-size'='1MB',
'sink.rolling-policy.rollover-interval'='60s',
'sink.partition-commit.delay'='0s',
'partition.time-extractor.timestamp-pattern' = '$dt',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.policy.kind'='success-file'
);
"""
t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
statement_set = t_env.create_statement_set()
statement_set.add_insert_sql("INSERT INTO sink_table SELECT user_id, order_amount, ts, DATE_FORMAT(ts, 'yyyy/MM/dd/HH/mm') as dt FROM source_table")
statement_set.execute().wait()
Max Dubinin
04/10/2023, 6:48 PMCREATE TABLE..
), then filtered twice by SELECT x,y,z FROM KinesisSource WHERE ..
queries. Each of those select
streams is inserted into a separate sink (also created with CREATE TABLE...
).
I run this setup in a statementSet
.
KinesisSource -> SELECT * where name = 'A' -> Sink 'A'
-> SELECT * where name = 'B' -> Sink 'B'
1. Does anyone know why I see two tasks with the name āSink: endā? It doesnāt seem that these are the two sinks I created because their bytes/records sent value is always the same (records sent of the first row divided by two)
2. ārecords sentā value keeps increasing although I donāt send anything. Can anyone explain why? (This happens in a FlinkDeployment
but not in a local cluster setup)
3. Is there a way to configure the job so I can see the source, the filters, and the sinks in the graph + their correct data?
4. Am I doing this wrong?Mali
04/10/2023, 7:16 PMfs.s3a.aws.credentials.provider: com.amazonaws.auth.WebIdentityTokenCredentialsProvider
fs.s3a.path.style.access: 'true'
fs.s3a.endpoint: s3.<region>.<http://amazonaws.com|amazonaws.com>
Note: I enabled HA.
But i am getting following errors;
-> Caused by: org.apache.flink.util.FlinkException: Could not create the ha services from the instantiated HighAvailabilityServicesFactory org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.
-> Caused by: java.nio.file.AccessDeniedException: s3://<bucket_name>/flink/recovery/test/blob: org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException: No AWS Credentials provided by WebIdentityTokenCredentialsProvider : com.amazonaws.SdkClientException: Unable to execute HTTP request: sts.<region>.<http://amazonaws.com|amazonaws.com>
What should i do ? (Flink version is 1.16.0)Jakub Janowski
04/11/2023, 8:16 AMNikola Stanisavljevic
04/11/2023, 9:24 AM1.17.0
.
This error appears.
If i try plugin of version 1.16.1
with flink 1.17 it works fine. So there seems to be a problem with this jar file build and its classesAmenreet Singh Sodhi
04/11/2023, 11:56 AMWARN org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to trigger or complete checkpoint 1 for job 000000006e6b13320000000000000000. (0 consecutive failed attempts so far)
org.apache.flink.runtime.checkpoint.CheckpointException: Failure to finalize checkpoint.
Caused by: java.io.IOException: Target file file:/opt/flink/pm/checkpoint/000000006e6b13320000000000000000/chk-1/_metadata already exists.
at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.getOutputStreamWrapper(FsCheckpointMetadataOutputStream.java:168)
And each time if it finds that the folder is already present for the checkpoint its about to create, it throws error, and Job restarts. How to handle it gracefully, or any way if flink job is able to rewrite metadata file in this folder, thus preventing the job to restart. Thanks in advance!chendan
04/11/2023, 1:35 PMAlex Brekken
04/11/2023, 1:38 PMReadWriteOnce
this isnāt surprising. If I use hostPath
instead of a PVC, then everything works fine though I doubt thatās the recommended approach..?
Why are both pods trying to use the same PVC? Do I need to have a PVC with ReadWriteMany? (I hope not.. š ) Thanks for any help!Virender Bhargav
04/11/2023, 3:50 PMDataStream<OperatorState> existingOperatorStates = newOperatorStates.getExecutionEnvironment().fromCollection(existingOperators).name("existingOperatorStates");
existingOperatorStates.flatMap(new StatePathExtractor()).setParallelism(1).addSink(new OutputFormatSinkFunction(new FileCopyFunction(path)));
finalOperatorStates = newOperatorStates.union(new DataStream[]{existingOperatorStates});
Thijs van de Poll
04/11/2023, 4:18 PMGuruguha Marur Sreenivasa
04/11/2023, 4:55 PMMapState
we use is empty on restart and it starts build up again. We are saving checkpoints on S3. Below is my checkpointing config:
this.env.enableCheckpointing(30000, CheckpointingMode.EXACTLY_ONCE);
this.env.setStateBackend(new HashMapStateBackend());
final CheckpointConfig config = env.getCheckpointConfig();
config.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
config.setCheckpointStorage(checkPointBucket);
config.setTolerableCheckpointFailureNumber(applicationConfiguration.getCheckpointFailureTolerance());
3. Are there ways to optimize the pipelines so there is less data transfer across task managers as streaming data progresses through to further stages in the pipeline?James Watkins
04/11/2023, 10:16 PMdependencies {
...
implementation 'com.ververica:flink-sql-connector-postgres-cdc:2.2.1'
This is the error Iām getting:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'postgres-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
I will add more details in-thread for the steps I have taken. Any help would be much appreciated.Amir Hossein Sharifzadeh
04/12/2023, 4:50 AMTable raw_table = tableEnv.sqlQuery(data_query);
DataStream<Row> join_stream = tableEnv.toDataStream(raw_table);
join_stream.process(new EMPADProcessor()).setParallelism(4);
My Processor class:
public class EMPADProcessor extends ProcessFunction<Row, String> {
public EMPADProcessor() {
}
@Override
public void processElement(Row row, ProcessFunction<Row, String>.Context context, Collector<String> collector) {
try {
int chunk_id = Integer.parseInt("" + row.getField(0));
...............
I want to stop/kill process when chunk_id=100
abhishek sidana
04/12/2023, 7:08 AMMali
04/12/2023, 7:23 AMINFO org.apache.flink.runtime.blob.FileSystemBlobStore [] - Creating highly available BLOB storage directory at s3://<my_path>/blob
My flink.conf is ;
s3.access-key: <my_access_key>
s3.secret-key: <my_secret_key>
s3.path.style.access: 'true'
s3.endpoint: s3.<my_regino>.<http://amazonaws.com|amazonaws.com>
#fs.s3a.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
#fs.s3a.aws.credentials.provider: org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
I tried to change ās3.ā to āfs.s3a.ā but nothing is changed. Anyone have idea about that ?Iain Dixon
04/12/2023, 10:17 AMChirag Dewan
04/12/2023, 11:21 AMMiguel Ćngel FernĆ”ndez FernĆ”ndez
04/12/2023, 11:47 AMchunilal kukreja
04/12/2023, 2:47 PMWARN org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to trigger or complete checkpoint 1 for job 000000006e6b13320000000000000000. (0 consecutive failed attempts so far)
org.apache.flink.runtime.checkpoint.CheckpointException: Failure to finalize checkpoint.
Caused by: java.io.IOException: Target file file:/opt/flink/pm/checkpoint/000000006e6b13320000000000000000/chk-1/_metadata already exists.
at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.getOutputStreamWrapper(FsCheckpointMetadataOutputStream.java:168)
Expectation: Ideally it should either skip this folder name use another or overwrite the content of the existing folder.
Can someone help me out to know if this is expected behaviour or there is some workaround available?Jirawech Siwawut
04/12/2023, 4:29 PMorg.apache.flink.runtime.checkpoint.CheckpointException: Failure to finalize checkpoint.
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1346) ~[flink-runtime-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1241) ~[flink-runtime-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1133) ~[flink-runtime-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89) ~[flink-runtime-1.15.1.jar:1.15.1]
at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119) ~[flink-runtime-1.15.1.jar:1.15.1]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_301]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_301]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_301]
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: The specified bucket does not exist (Service: Amazon S3; Status Code: 404; Error Code: NoSuchBucket;
Here is my setup
flink-conf.yaml
s3.access-key: accces_key
s3.secret-key: secret_key
s3.endpoint: <http://myendpoint.com|myendpoint.com>
---
env.setStateBackend(new FsStateBackend("<s3://mybucket/checkpoint>"));
I found that it is still connecting to default s3 endpoint from debug logBhupendra Yadav
04/12/2023, 5:14 PMHunter Medney
04/12/2023, 7:32 PMJeesmon Jacob
04/12/2023, 9:22 PMexecution.savepoint.ignore-unclaimed-state: true
but no luck. We are on Flink 1.15. Thanks!
2023-04-12 21:04:57,615 WARN org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - Exception while restoring operator state backend for CoBroadcastWithNonKeyedOperator_fda18ba392f9dc50769c0bd716347531_(1/8) from alternative (1/1), will
retry while more alternatives are available.
org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend
at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83) ~[flink-dist-1.15.3.jar:1.15.3]
at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createOperatorStateBackend(HashMapStateBackend.java:160) ~[flink-dist-1.15.3.jar:1.15.3]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:277) ~[flink-dist-1.15.3.jar:1.15.3]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist-1.15.3.jar:1.15.3]
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist-1.15.3.jar:1.15.3]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:286) ~[flink-dist-1.15.3.jar:1.15.3]
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:174) ~[flink-dist-1.15.3.jar:1.15.3]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:265) ~[flink-dist-1.15.3.jar:1.15.3]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) ~[flink-dist-1.15.3.jar:1.15.3]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:703) ~[flink-dist-1.15.3.jar:1.15.3]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) ~[flink-dist-1.15.3.jar:1.15.3]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:679) ~[flink-dist-1.15.3.jar:1.15.3]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:646) ~[flink-dist-1.15.3.jar:1.15.3]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) ~[flink-dist-1.15.3.jar:1.15.3]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) ~[flink-dist-1.15.3.jar:1.15.3]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) ~[flink-dist-1.15.3.jar:1.15.3]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) ~[flink-dist-1.15.3.jar:1.15.3]
at java.lang.Thread.run(Unknown Source) ~[?:?]
Slackbot
04/13/2023, 12:46 AM