Sree
07/08/2022, 10:19 PMHaim Ari
07/10/2022, 11:56 AMkind: FlinkDeployment
Is it possible to specify
imagePullSecrets
I’d like to use a private custom flink image (using Operator)仰望星空
07/11/2022, 2:22 AMBastien DINE
07/11/2022, 7:31 AMOwen Lee
07/11/2022, 9:16 AMEnrique Lacal
07/11/2022, 10:20 AMAkhlaq Malik
07/11/2022, 12:20 PMJaya Ananthram
07/11/2022, 1:08 PMCould not find a file system implementation for scheme 's3p'
. Is there any preferred way to do it IDE?anusca
07/11/2022, 2:03 PMBhanu Vattikuti
07/11/2022, 4:43 PMBhanu Vattikuti
07/12/2022, 12:10 AMMárk Bartos
07/12/2022, 6:53 AMJaya Ananthram
07/12/2022, 7:09 AMMárk Bartos
07/12/2022, 7:20 AMBhupendra Yadav
07/12/2022, 10:31 AMJaya Ananthram
07/12/2022, 11:40 AMBut there are some configurations that do not support to set, because Flink will override them, e.g. 'key.deserializer' and 'value.deserializer'.
If I am fine to convert my custom format to Row format using my own custom deserialiser logic, is there a way to apply this by some how? Or any work around here to use custom format in table API?Jeesmon Jacob
07/12/2022, 12:23 PMsalvalcantara
07/12/2022, 3:15 PMczchen
07/12/2022, 3:23 PMownerReferences
in flink-operator jobmanager deployment? We use Argo CD to manage FlinkDeployment
, and Argo CD needs ownerReferences
to display child resources properly.仰望星空
07/13/2022, 1:29 AMAqib Mehmood
07/13/2022, 4:59 AMorg.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold
and
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired before completing.
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2139) [flink-dist-1.15.0.jar:1.15.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) [?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
at java.lang.Thread.run(Unknown Source) [?:?]
Job runs fine for a while and then this happens. I've tried increasing the CheckpointTimeOut limit but that does not seem to be working.
I'm not sure what other steps we can take to cater this as this is causing many discontinuities in our job.
This is our current configuration for this job.
StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamExecEnv.enableCheckpointing(1000);
streamExecEnv.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
streamExecEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
streamExecEnv.getCheckpointConfig().setCheckpointTimeout(180000);
streamExecEnv.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);
streamExecEnv.getCheckpointConfig().setCheckpointStorage(
new FileSystemCheckpointStorage("file:///opt/flink/checkpoints/"));
streamExecEnv.getCheckpointConfig().setExternalizedCheckpointCleanup(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
streamExecEnv.disableOperatorChaining();
streamExecEnv.setParallelism(1);
streamExecEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(
10, // number of restart attempts
Time.of(15, TimeUnit.SECONDS) // delay
));
Looking forward to your responses. TIAlaxmi narayan
07/13/2022, 5:42 AMMárk Bartos
07/13/2022, 6:47 AMHAJI
07/13/2022, 7:18 AMsalvalcantara
07/13/2022, 8:49 AManusca
07/13/2022, 9:26 AMYingxin Tsao
07/14/2022, 1:56 AMHAJI
07/14/2022, 2:24 AMlaxmi narayan
07/14/2022, 6:27 AMAdesh Dsilva
07/14/2022, 9:09 AMinput/
202207130945/
data.avro
data.done
202207130950/
data.avro
data.done
...
202207140945/
data.avro
data.done
I want to read all data.avro files for 2022071309**
. I see that AvroInputFormat takes only a single path and if I try setFilePaths() it gives exception:
FileInputFormats with multiple paths are not supported yet.
So I tried using the table API using partitionedBy:
tableEnv.createTable("AvroSourceTable", TableDescriptor.forConnector("filesystem")
.schema(schema)
.option("path", "input")
.partitionedBy("pt") // defined as .column("pt", DataTypes.CHAR(12))
.format(FormatDescriptor.forFormat("avro").build())
.build());
and then
tableEnv.from("AvroSourceTable").where($("pt").isEqual("202207130945"))
But it gives this exception while writing to table Partition keys are: [pt], incomplete partition spec: {}
Am I doing this wrong? Is there an easy way to read multiple directories in Flink?