raghav tandon
11/24/2022, 10:22 AMIn order to recover submitted jobs, Flink persists metadata and the job artifacts. The HA data will be kept until the respective job either succeeds, is cancelled or fails terminally. Once this happens, all the HA data, including the metadata stored in the HA services, will be deleted.
Does this mean if the job is restarted(due to user code exception) and eventually stopped due to restart-strategy: fixed-delay
, then the HA state from ZK is gone, is there a way to change this behaviour?Alexis Josephides (Contractor)
11/24/2022, 2:47 PMStreaming File Sink
to S3?
tldr:
The operator is consistently showing the most backpressure and resource usage and causing the Flink app to full restart.Thiruvenkadesh Someswaran
11/24/2022, 11:03 PMERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error occurred in the cluster entrypoint.
java.util.concurrent.CompletionException: java.lang.IllegalStateException: The base directory of the JobResultStore isn't accessible. No dirty JobResults can be restored.
Here is my flink config
image: flink:1.15
flinkVersion: v1_15
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
state.savepoints.dir: file:///flink-data/savepoints
state.checkpoints.dir: file:///flink-data/checkpoints
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: file:///flink-data/ha
rest.port: "9091"
Slackbot
11/25/2022, 3:42 AMLee xu
11/25/2022, 3:45 AMRaghunadh Nittala
11/25/2022, 4:54 AMorg.apache.parquet:parquet-protobuf
were required for this and we’ve added the same to our gradle (I’ve added them to flinkShadowJar). When I try to run the job using the fat jar, it throws
Caused by: java.lang.NoClassDefFoundError: org/apache/parquet/proto/ProtoWriteSupport
at org.apache.flink.formats.parquet.protobuf.ParquetProtoWriters.lambda$forType$84425dfe$1(ParquetProtoWriters.java:41)
at org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:56)
at org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNew(BulkBucketWriter.java:76)
....
....
Caused by: java.lang.ClassNotFoundException: org.apache.parquet.proto.ProtoWriteSupport
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
The job only succeeds when we copy the jar org.apache.parquet:parquet-protobuf
into Flink lib. Why the class is not being picked when it is available in the fat jar (I extracted the fat JAR and could see the class inside). I tried by providing compileOnly/runtimeOnly as well, but no luck.Abhinav sharma
11/25/2022, 11:06 AMRaghunadh Nittala
11/25/2022, 2:11 PMCaused by: java.lang.ClassNotFoundException: org.apache.flink.fs.azure.common.hadoop.HadoopRecoverableWriter
at <http://java.net|java.net>.URLClassLoader.findClass(Unknown Source) ~[?:?]
at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
at org.apache.flink.core.classloading.ComponentClassLoader.loadClassFromComponentOnly(ComponentClassLoader.java:149) ~[flink-dist-1.16.0.jar:1.16.0]
at org.apache.flink.core.classloading.ComponentClassLoader.loadClassFromOwnerFirst(ComponentClassLoader.java:171) ~[flink-dist-1.16.0.jar:1.16.0]
at org.apache.flink.core.classloading.ComponentClassLoader.loadClass(ComponentClassLoader.java:106) ~[flink-dist-1.16.0.jar:1.16.0]
at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
I have copied flink-azure-fs-hadoop.jar to azure-fs-hadoop plugins folder as mentioned in https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/filesystems/azure/. While exploring around this, I came across the JIRAs https://issues.apache.org/jira/browse/FLINK-17444 and https://issues.apache.org/jira/browse/FLINK-18568. Now my question is, Dies FileSink work for Azure? Why I’m seeing this exception, even if I’ve provided the Jar under plugins?sharad mishra
11/25/2022, 2:36 PMProcessFunctionTestHarnesses
even after adding following dependencies:
"org.apache.flink" % "flink-test-utils" % flinkVersion % Test,
"org.apache.flink" % "flink-test-utils-junit" % flinkVersion % Test,
"org.apache.flink" % "flink-runtime" % flinkVersion % Test
Am I missing something ?Matt Butler
11/25/2022, 10:49 PMpublic class TestClass {
public static class MyTestFunction extends ScalarFunction {
private static transient Meter meter;
@Override
public void open(FunctionContext context) throws Exception {
com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter();
this.meter = context.getMetricGroup()
.meter("myMeter", new DropwizardMeterWrapper(dropwizardMeter));
}
public String eval(String s) {
this.meter.markEvent();
return "Test";
}
}
public static void main(String[] args) throws Exception {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
TableEnvironment tableEnvironment = TableEnvironment.create(settings);
tableEnvironment.createTemporarySystemFunction("MyTestFunction", MyTestFunction.class);
....
}
}
Vishal bharatbhai Vanpariya
11/26/2022, 10:42 AMkafka-console-producer.sh --broker-list localhost:9092 --topic topic-name --property "parse.key=true" --property "key.separator=:"
key1:value1
key2:value2
key3:value3
Flink code:
keyedstream.
sinkTo(KafkaSink.<String>builder()
.setBootstrapServers("localhost:9092")
.setRecordSerializer(
KafkaRecordSerializationSchema.builder().setTopic("flink")
.setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build());
ThanksVishal bharatbhai Vanpariya
11/28/2022, 7:27 AMJackwangcs
11/28/2022, 8:45 AMding bei
11/28/2022, 2:09 PMOnCheckpointRollingPolicy
which contains a description below picture. I tried to extends CheckpointRollingPolicy
to override function shouldRollOnEvent , did not work. And i saw a question on stackoverflow which is exactly same to question i have , but no answers. So how can i rolling files by its size using DataStream API?
<https://stackoverflow.com/questions/70288006/flink-filesink-with-bulk-format-to-s3-rolling-policy-how-to-specify-size-time>
Donatien Schmitz
11/28/2022, 2:32 PMJobMaster
? I see that there is a TaskManagerTracker
used by SlotManagers
but I don't see any access available through the JobMaster
. Right now I am using the ResourceManagerGateway
but the Gateway is generated after the creation of the Scheduler so I'm facing a null pointer exception when I try to call the requestTaskManagerInfo
method.Dylan Fontana
11/28/2022, 3:45 PMFelix Angell
11/28/2022, 4:37 PMAeden Jameson
11/28/2022, 6:03 PMKarthi Thyagarajan
11/29/2022, 1:12 AMjava.lang.NoClassDefFoundError: com/ctc/wstx/io/InputBootstrapper
at org.apache.flink.formats.parquet.ParquetFileFormatFactory.getParquetConfiguration(ParquetFileFormatFactory.java:92)
at org.apache.flink.formats.parquet.ParquetFileFormatFactory.access$000(ParquetFileFormatFactory.java:52)
at org.apache.flink.formats.parquet.ParquetFileFormatFactory$1.createRuntimeEncoder(ParquetFileFormatFactory.java:80)
at org.apache.flink.formats.parquet.ParquetFileFormatFactory$1.createRuntimeEncoder(ParquetFileFormatFactory.java:74)
at org.apache.flink.connector.file.table.FileSystemTableSink.createWriter(FileSystemTableSink.java:376)
at org.apache.flink.connector.file.table.FileSystemTableSink.createStreamingSink(FileSystemTableSink.java:197)
at org.apache.flink.connector.file.table.FileSystemTableSink.consume(FileSystemTableSink.java:155)
at org.apache.flink.connector.file.table.FileSystemTableSink.lambda$getSinkRuntimeProvider$0(FileSystemTableSink.java:140)
Any suggestions on what I could be missing?
Thanks in advance.Shira Bodenstein
11/29/2022, 8:20 AMassignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner)
The periodic assigner simply extracted the timestamp from the event.
I have upgraded flink version to 1.11.2 from 1.6.3.
This API is deprecated and I'm trying to understand what should I use to replace it.
I tried:
assignTimestampsAndWatermarks(WatermarkStrategy.<T>
forBoundedOutOfOrderness(Duration.ofHours(valueLatenessEntryHours))
.withTimestampAssigner(SerializableTimestampAssigner<T> timestampAssigner))
while the timestampAssigner extracted the timestamp from the event.
But something seems not to work, I don't see any data fires from the windows and written to the sink.
Any ideas on what could be missing?
Thank you!monaawang
11/29/2022, 8:59 AMKosta Sovaridis
11/29/2022, 1:21 PMSumit Nekar
11/29/2022, 2:02 PMINFO ][flink-flink-test-application-preprod/flink-test-application] >>> Event | Warning | CLUSTERDEPLOYMENTEXCEPTION | The Flink cluster flink-test-application already exists.
2022-11-29 13:55:05,166 o.a.f.k.o.r.ReconciliationUtils [WARN ][flink-flink-test-application-preprod/flink-test-application] Attempt count: 10, last attempt: true
2022-11-29 13:55:05,193 o.a.f.k.o.l.AuditUtils [INFO ][flink-flink-test-application-preprod/flink-test-application] >>> Status | Error | UPGRADING | org.apache.flink.client.deployment.ClusterDeploymentException: The Flink cluster flink-test-application already exists.
2022-11-29 13:55:05,193 i.j.o.p.e.ReconciliationDispatcher [ERROR][flink-flink-test-application-preprod/flink-test-application] Error during event processing ExecutionScope{ resource id: ResourceID{name='flink-test-application', namespace='flink-test-application-ns'}, version: 186818983} failed.
org.apache.flink.kubernetes.operator.exception.ReconciliationException: org.apache.flink.client.deployment.ClusterDeploymentException: The Flink cluster flink-test-application already exists.
at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:133)
at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:54)
at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:130)
at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:88)
at org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:81)
at io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:87)
at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:135)
at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:115)
at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:86)
at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:59)
at io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:395)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
I have not assigned any kuberentes.cluster-id explicitly. Can this happen when updated job graph is not compatible with the previous one?
Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: The Flink cluster flink-test-application already exists.
at org.apache.flink.kubernetes.KubernetesClusterDescriptor.deployApplicationCluster(KubernetesClusterDescriptor.java:181)
at org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer.run(ApplicationClusterDeployer.java:67)
at org.apache.flink.kubernetes.operator.service.NativeFlinkService.deployApplicationCluster(NativeFlinkService.java:70)
at org.apache.flink.kubernetes.operator.service.AbstractFlinkService.submitApplicationCluster(AbstractFlinkService.java:177)
at org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:187)
at org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.deploy(ApplicationReconciler.java:55)
at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.restoreJob(AbstractJobReconciler.java:208)
at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler.reconcileSpecChange(AbstractJobReconciler.java:140)
at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:153)
at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:56)
at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:123)
... 13 more
2022-11-29 13:55:05,194 i.j.o.p.e.EventProcessor [ERROR][flink-test-application-ns/flink-test-application] Exhausted retries for ExecutionScope{ resource id: ResourceID{name='flink-test-application', namespace='flink-flink-test-application-ns'}, version: 186818983}
Ali AIT-BACHIR
11/29/2022, 2:15 PMstumpy
that uses numba
.Chase Diem
11/29/2022, 5:13 PM<https://app.datadoghq>
, however that's an issue becuase we would like to use a custom endpoint set up by VictoriaMetrics as noted here https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent.
Is there any easy way to override the URL?Momir Beljic
11/29/2022, 7:25 PMpyflink
with datastream API
to run some kinesis data analytics application that reads from kinesis data stream. I generate locally some random data using kinesis data stream process_message_stream
and now I want to read that stream with pyflink datastream api. However, I get this message,
py4j.protocol.Py4JJavaError: An error occurred while calling o1.toAppendStream.
: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.process_message_stream'.
when I run this pyflink code:
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
t_env.get_config().get_configuration().set_string(
"pipeline.jars",
"file:///" + CURRENT_DIR + "/lib/flink-sql-connector-kinesis_2.12-1.13.2.jar" #"/lib/flink-sql-connector-kinesis-1.15.2.jar",
)
t_env.execute_sql(""" CREATE TABLE process_message_stream (
EVENT_DATE_TIME TIMESTAMP(3),
DEV_ID VARCHAR(6),
T_ID VARCHAR(6),
) PARTITIONED By (DEV_ID)
WITH (
'connector' = 'kinesis',
'aws.region' = 'eu-west-1',
'stream' = 'process_message_stream'
)""")
ds = t_env.to_append_stream(
t_env.from_path("process_message_stream"),
Types.ROW([Types.SQL_TIMESTAMP(), Types.STRING(), Types.STRING()]))
ds.add_sink(StreamingFileSink
.for_row_format('/tmp/output', Encoder.simple_string_encoder())
.build())
t_env.execute("tutorial_job")
Steven Zhang
11/29/2022, 7:37 PMRICHARD JOY
11/29/2022, 7:55 PMEmmanuel Leroy
11/29/2022, 9:11 PMConnie Yang
11/30/2022, 12:37 AM