Ari Huttunen
02/03/2023, 1:42 PMpyspark.sql.functions.percentile_approx()
and if possible, I'd like to do the same in Flink.
It would be enough to be able to calculate this in specific time-based windows, like for 1 hour.kingsathurthi
02/03/2023, 5:19 PM[user@server ~]$ oc get flinkdeployments
NAME JOB STATUS RECONCILIATION STATUS
kmfcore RECONCILING DEPLOYED
and Im getting below error in the taskmanager, what could be the issue
[user@server]$ oc logs kmfcore-taskmanager-1-1 | grep -i fail
2023-02-03 13:31:45,443 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: execution.submit-failed-job-on-application-error, true
2023-02-03 13:31:49,839 INFO org.apache.flink.runtime.net.ConnectionUtils [] - Failed to connect to [/192.168.9.97:6123] from local address [localhost/127.0.0.1] with timeout [100] due to: Invalid argument (connect failed)
Sumit Nekar
02/03/2023, 5:40 PMINFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - JobManager for job 6f180ddd63b2de76c0148a53c092f02f with leader id a7b5b93e86521793b3e017e64695426c lost leadership.
Please suggest if we can tune any parameters prevent this issue.
JobManager is configured with HA (single job manager) and deployed using flink k8s operator (native k8s - Flink 1.13.6)Jirawech Siwawut
02/04/2023, 7:21 AMGerald Schmidt
02/04/2023, 8:16 AMSiddhesh Kalgaonkar
02/05/2023, 9:29 AMwatchType
), find the files to be processed, divide them in splits, and assign these splits to the downstream readers. The readers are the ones who will read the actual data. Each split is read by only one reader, while a reader can read multiple splits, one-by-one. So what do you mean by readers here? Is it some kind of interdependent task under TM or something else?Siddhesh Kalgaonkar
02/05/2023, 3:11 PMAri Huttunen
02/05/2023, 7:18 PMAri Huttunen
02/05/2023, 7:49 PMJoe Crowley
02/06/2023, 4:38 AMTony Yeung
02/06/2023, 8:03 AMscan.incremental.snapshot.chunk.size
& scan.snapshot.fetch.size
which are similar. Would someone can explain the differences and effect of these 2 options?
https://github.com/ververica/flink-cdc-connectors/blob/release-2.3/docs/content/connectors/mysql-cdc.md#connector-optionsTan Trinh
02/06/2023, 9:18 AMtaskmanager.numberOfTaskSlots: "1"
taskmanager.memory.managed.fraction: "0.0"
taskmanager.memory.managed.size: 100m
taskmanager.network.memory.buffer-debloat.enabled: "true"
execution.checkpointing.interval: 1m
execution.checkpointing.tolerable-failed-checkpoints: "3"
execution.checkpointing.unaligned: "true"
state.checkpoints.dir: s3://<>
state.savepoints.dir: s3://<>
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: s3://<>
s3.endpoint: http://<>
s3.access-key: <>
s3.secret-key: <>
s3.path.style.access: "true"
kubernetes.operator.reconcile.interval: 1m
metrics.reporters: prom
metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.port: "9999"
env.java.opts.taskmanager: "-XX:+UseParallelGC"
I change the taskmanager.memory.managed.fraction config to 0.0 (it means managed memory size = 0) because I am not use managed memory (not using rockDBstate)
But, taskmanager always be deleted and has the error code 137 (OOM killed)
2023-02-06 07:54:33,801 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Worker log2metric-prod-example-taskmanager-1-1 is terminated. Diagnostics: Pod terminated, container termination statuses: [flink-main-container(exitCode=137, reason=OOMKilled, message=null)], pod status: Failed(reason=null, message=null)
2023-02-06 07:54:33,802 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Closing TaskExecutor connection <> because: Pod terminated, container termination statuses: [flink-main-container(exitCode=137, reason=OOMKilled, message=null)], pod status: Failed(reason=null, message=null)
I increase taskmanager.memory.managed.fraction config to 0.4 and the taskmanager pod is running normally
So, is flink still using managed memory even though we not using rockDBstate which causes pod to use more memory than requested?Kristian Grimsby
02/06/2023, 9:44 AMkingsathurthi
02/06/2023, 12:40 PMapiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
kind: FlinkSessionJob
metadata:
name: basic-session-job-only-example-2
spec:
deploymentName: basic-session-deployment-only-example
job:
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
parallelism: 4
upgradeMode: stateless
Error :
Spec:
Deployment Name: basic-session-deployment-only-example
Job:
Args:
Jar URI: local:///opt/flink/examples/streaming/StateMachineExample.jar
Parallelism: 4
State: running
Upgrade Mode: stateless
Status:
Error: {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file s
ystem implementation for scheme 'local'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. For a full list of supported file s
ystems, please see <https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/>.","throwableList":[{"type":"org.apache.flink.core.fs.UnsupportedFileSystemSchemeException","message
":"Could not find a file system implementation for scheme 'local'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. For a ful
l list of supported file systems, please see <https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/>."},{"type":"org.apache.flink.core.fs.UnsupportedFileSystemSchemeException
","message":"Hadoop is not in the classpath/dependencies."}]}
Kosta Sovaridis
02/06/2023, 1:00 PMRichard Noble
02/06/2023, 3:14 PMHADOOP_OPTS="-Dfs.AbstractFileSystem.s3a.impl=org.apache.hadoop.fs.s3a.S3A -Dfs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem"
I get an
Class org.apache.hadoop.fs.s3a.S3AFileSystem not found.
I've tried the plugins in lib, and that worked locally, but not on the cluster.
Does anyone have an example setup?Tony Wang
02/06/2023, 5:59 PMenv.getConfig().setGlobalJobParameters(params);
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.setParallelism(1);
String path = "/home/ubuntu/flink/flink-examples/flink-examples-streaming/target/csv";
final CsvReaderFormat<Pojo> format = CsvReaderFormat.forPojo(Pojo.class);
File csvfile = new File(path);
final FileSource<Pojo> csv_source =
FileSource.forRecordStreamFormat(format, Path.fromLocalFile(csvfile)).
monitorContinuously(Duration.ofMillis(1)).build();
DataStream<Pojo> source = env.fromSource(
csv_source,
new WatermarkStrategy<Pojo>() {
@Override
public WatermarkGenerator<Pojo> createWatermarkGenerator(
WatermarkGeneratorSupplier.Context context) {
return new MyPerElementWatermarkGenerator();
}
}
.withTimestampAssigner((event, timestamp) -> event.event_time), "csv-source");
Slackbot
02/06/2023, 6:01 PMKrish Narukulla
02/06/2023, 6:57 PMNihar Rao
02/06/2023, 10:37 PMpublic class FlinkMetric extends RichMapFunction<String, String> {
public void playWithMetrics() {
// Gauge
Gauge<Integer> random = () -> 1;
this.getRuntimeContext().getMetricGroup().gauge("myGauge", random);
}
@Override
public String map(String value) throws Exception {
return null;
}
}
This class is simply called my the main class and I am running my application as a jar by submitting it on flink web's submit job option. I am getting the following error:
java.lang.IllegalStateException: The runtime context has not been initialized yet.
I looked into it and I understand that the runtime context is null. Can you please tell me why the runtime context is not getting set automatically?
ThanksMikko Lehtimäki
02/07/2023, 9:08 AMMikko Lehtimäki
02/07/2023, 9:45 AMenv.from_source(
source,
WatermarkStrategy.no_watermarks(),
"Kafka Source",
)
# Read the json-strings into dictionaries
.map(json.loads) .assign_timestamps_and_watermarks(WatermarkStrategy.for_monotonous_timestamps().with_timestamp_assigner(SensorTimestampAssigner()))
or must they be added to from_source()
? I'd like to load the Kafka payload string into dictionary before I extract the timestamp. source
here is a KafkaSource
.Mikko Lehtimäki
02/07/2023, 11:47 AMYang LI
02/07/2023, 12:53 PMgroupBy
API running in flink job, and it has different results when I run this function in a unit test with normal jvm context and run this function in the flink job (with parallelism 1 and only 1 taskmanager and 1 task slot ).
Anyone knows something we should be careful to configure in our flink cluster so the computation result are the same in unit test and flink run time ?Yang LI
02/07/2023, 2:06 PMSami Airaksinen
02/07/2023, 2:56 PMparquet
formatFactory. So, I'm keen to know that:
1. Which jar should I use (I'm using TableDescriptor to define my sink table)? flink-parquet or flink-sql-parquet?
2. Should I include it into cluster job and taskmanagers /opt/flink/lib/ folder or try add is as .add_jar(...)
method? pyflink-shell doesn't offer --jars option during launchBetim Bryma
02/07/2023, 4:47 PMKevin Lam
02/07/2023, 5:05 PMKyle Ahn
02/07/2023, 6:31 PMErwin Cabral
02/07/2023, 7:21 PMstate.checkpoints.dir: <hdfs://namenode>:port/checkpoints
and I get the following error:
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
Any ideas? Thanks in advance.