Tiansu Yu
11/16/2022, 10:21 AMstream = env.createInput(parquetInputFormat)
as source, and simply print it out for testing. The pipeline stops finished early without reading a single bytes from the S3 file. There is no exception. Inside the taskmanager log, I see that Flink could not find hadoop classpath. Is this somehow related with Flink not reading the S3 file?Maher Turifi
11/16/2022, 12:42 PMAdrian Chang
11/16/2022, 2:38 PMFlatMapFunction
using Python.
class OccupancyEventFlatMap(FlatMapFunction):
def open(self, runtime_context: RuntimeContext):
mg = runtime_context.get_metrics_group()
self.counter_sum = mg.counter("my_counter_sum")
self.counter_total = mg.counter("my_counter_total")
def flat_map(self, value):
self.counter_sum.inc(10)
self.counter_total.inc()
I am able to query the metric using the REST API
http://localhost:43491/jobs/9a376e28a1bb022b45c127d75fb1b447/vertices/5239a5f0e3e9cdca6a88500e58b5759e/metrics?get=0.FlatMap.my_counter_sum
[{"id":"0.FlatMap.my_counter_sum","value":"28201"}]
But I don't see any of my custom metrics in Datadog metrics, however I see all the standard Flink metrics there.
This is my configuration in Flink for Datadog exporter
# Datadog
metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter
metrics.reporter.dghttp.dataCenter: US
metrics.reporter.dghttp.apikey: ${datadog_api_key}
metrics.reporter.dghttp.tags: env:development
# <https://docs.datadoghq.com/integrations/flink/#configuration>
<http://metrics.scope.jm|metrics.scope.jm>: flink.jobmanager
metrics.scope.jm.job: flink.jobmanager.job
<http://metrics.scope.tm|metrics.scope.tm>: flink.taskmanager
metrics.scope.tm.job: flink.taskmanager.job
metrics.scope.task: flink.task
metrics.scope.operator: flink.operator
It's the first time I am tying to send custom metrics from Flink to Datadog.
Am I doing something wrong ?
ThanksRommel
11/16/2022, 6:09 PMflinkConfiguration:
taskmanager.numberOfTaskSlots: {{ .Values.numOfTaskSlots | quote }}
jobmanager.rpc.port: "6123"
blob.server.port: "6124"
taskmanager.rpc.port: "6122"
queryable-state.proxy.ports: "6125"
kubernetes.cluster-id: flink-cluster-{{ .Release.Name }}
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: {{ .Values.haStorageDir }}
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: "10"
kubernetes.namespace: flink
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: "9250"
state.savepoints.dir: {{ .Values.savepointDir }}-{{ .Release.Name }}
when i try to deploy it, i ran into error admission webhook "<http://flinkoperator.flink.apache.org|flinkoperator.flink.apache.org>" denied the request: Forbidden Flink config key: kubernetes.namespace
and also ``admission webhook "flinkoperator.flink.apache.org" denied the request: Forbidden Flink config key: kubernetes.cluster-id`
I can remove the kubernetes.namespace
, but i thought in order to use high availability i need to have cluster-id setup. can anyone help me on this?Dimitris Kalouris
11/16/2022, 6:55 PMAdrian Chang
11/16/2022, 7:53 PMgauge
metric. Does the reporter ( Datadog reporter in my case ) reports all the samples measured or it reports only the current value of my gauge
metric every X seconds ? Is this a good practice or is it better having 2 counter
metrics like time_sum
and time_count
and then in Datadog compute time_sum / time_count
to have the average of how long my UDF takes ?
ThanksRaghunadh Nittala
11/16/2022, 11:45 PMINSERT INTO
query. Now, the job is failing with java.lang.OutOfMemoryError: Java heap space
exception, even though we increased the total memory and task manager memory. My question is, does the default parquet partitioning store the state in RocksDB (RocksDB state backend is enabled)?James McGuire
11/17/2022, 12:44 AMorg.codehaus.commons.compiler.CompileException: Line 23, Column 5: Cannot determine simple type name "com"
error. Issue seems to be related to deserializing protobuf schema that use google.protobuf.Timestamp
rows. More details in thread.Aishwarya Raimule
11/17/2022, 7:50 AMclass ValidKafkaEventFilter(FilterFunction):
def __init__(self):
self.invalid_counter = None
def open(self, runtime_context: RuntimeContext):
self.invalid_counter = (
runtime_context.get_metrics_group()
.add_group("kinesisanalytics")
.counter("InvalidKafkaEventCount")
)
def filter(self, row):
try:
is_valid = _has_valid_values(row)
if not is_valid:
self.invalid_counter.inc(1)
return is_valid
except ValueError:
return False
Now the error I am getting on using filter is
if not is_valid:
self.invalid_counter.inc(1)
AttributeError: 'NoneType' object has no attribute 'inc'
This means that the invalid_counter hasn’t been initialised. Can someone help me on the reason for this? Am I missing something?Tan Trinh
11/17/2022, 8:51 AMCaused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: There is no operator for the state 7d9b7588bc2ff89baed50d7a4558caa4
at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: java.lang.IllegalStateException: There is no operator for the state 7d9b7588bc2ff89baed50d7a4558caa4
at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness(StateAssignmentOperation.java:733) ~[flink-dist-1.15.2.jar:1.15.2]
at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:98) ~[flink-dist-1.15.2.jar:1.15.2]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1670) ~[flink-dist-1.15.2.jar:1.15.2]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1598) ~[flink-dist-1.15.2.jar:1.15.2]
at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:177) ~[flink-dist-1.15.2.jar:1.15.2]
at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:363) ~[flink-dist-1.15.2.jar:1.15.2]
at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:208) ~[flink-dist-1.15.2.jar:1.15.2]
at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:191) ~[flink-dist-1.15.2.jar:1.15.2]
at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:139) ~[flink-dist-1.15.2.jar:1.15.2]
at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:135) ~[flink-dist-1.15.2.jar:1.15.2]
at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:115) ~[flink-dist-1.15.2.jar:1.15.2]
at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345) ~[flink-dist-1.15.2.jar:1.15.2]
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:322) ~[flink-dist-1.15.2.jar:1.15.2]
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:106) ~[flink-dist-1.15.2.jar:1.15.2]
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:94) ~[flink-dist-1.15.2.jar:1.15.2]
at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) ~[flink-dist-1.15.2.jar:1.15.2]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
at java.lang.Thread.run(Unknown Source) ~[?:?]
I think it is stuck in the restoring latest checkpoints stage, but the statement: There is no operator for the state 7d9b7588bc2ff89baed50d7a4558caa4
always appears no matter what cluster flink I apply.
Thanks in advance!Haim Ari
11/17/2022, 8:54 AMInternal error occurred: failed calling webhook "<http://flinkoperator.flink.apache.org|flinkoperator.flink.apache.org>": Post "<https://flink-operator-webhook-service.flink.svc:443/validate?timeout=10s>": x509: certificate signed by unknown authority
Volodymyr Meshchanynets
11/17/2022, 4:36 PMAri Huttunen
11/17/2022, 4:48 PMAbel Lamjiri
11/17/2022, 5:44 PMMax Gurewitz
11/17/2022, 5:58 PMFor example, it is impossible to count all elements in a stream, because streams are in general infinite (unbounded). Instead, aggregates on streams (counts, sums, etc), are scoped by windows, such as “count over the last 5 minutes”, or “sum of the last 100 elements”.
https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/time/Thank you!
Padraic McAtee
11/17/2022, 7:14 PMSlackbot
11/17/2022, 11:40 PMStephan Weinwurm
11/18/2022, 12:44 AMregion
as the restart strategy but the logs reveal this:
Nov 17 10:20:30 org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - Calculating tasks to restart to recover the failed task 31284d56d1e2112b0f20099ee448a6a9_11.
Nov 17 10:20:30 org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - 5650 tasks should be restarted to recover the failed task 31284d56d1e2112b0f20099ee448a6a9_11.
Is there a way to have Flink restart only the one failed task instead of all of them or more gracefully handle these scenarios?Nithin Kumar Vokkarla
11/18/2022, 6:31 AMClusterDeploymentException
Could not create Kubernetes cluster "flink-caldel".
Last time when i faced this issue, Task manager and job manager pods were created after long time (like hour)
this is the helm template that i used.
apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
kind: FlinkDeployment
metadata:
name: {{ include "flinkapplication-calculatedelta.fullname" . }}
namespace: {{ .Release.Namespace }}
labels:
environment: {{ .Values.environment }}
spec:
image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}"
flinkVersion: v1_15
{{- with .Values.flinkConfig }}
flinkConfiguration:
{{- toYaml . | nindent 4 }}
{{- end }}
#taskmanager.numberOfTaskSlots = 1 in values.yaml file under flinkconfig
podTemplate:
apiVersion: v1
kind: Pod
metadata:
name: pod-template
labels:
environment: {{ .Values.environment }}
spec:
serviceAccount: {{ include "flinkapplication-calculatedelta.serviceAccountName" . }}
nodeSelector:
............
containers:
# Do not change the main container name
- name: flink-main-container
volumeMounts:
- mountPath: {{.Values.configFiles.mountPath}}
name: ......
volumes:
- name: ......
configMap:
name: {{ .Release.Name }}
serviceAccount: {{ include "flinkapplication-calculatedelta.serviceAccountName" . }}
jobManager:
resource:
{{- toYaml .Values.jobManagerConfig.resources | nindent 6 }}
replicas: {{ .Values.jobReplicas }}
taskManager:
resource:
{{- toYaml .Values.taskManagerConfig.resources | nindent 6 }}
job:
jarURI: {{ .Values.jarName }} # Note, this jarURI is actually a placeholder
parallelism: {{ .Values.parallelism }}
entryClass: {{ .Values.entryClass }}
can someone help me in this issue
Thank youEmily Morgan
11/18/2022, 7:41 AMMakas Tzavellas
11/18/2022, 7:53 AMjava.lang.UnsupportedOperationException: Could not find type serializer for current type [PojoType<my.org.demo.CustomResult, fields = [col: String, value: String]>].
at org.apache.flink.streaming.api.utils.PythonTypeUtils$TypeInfoToSerializerConverter.typeInfoSerializerConverter(PythonTypeUtils.java:831) ~[?:?]
at org.apache.flink.streaming.api.utils.PythonTypeUtils$TypeInfoToSerializerConverter.lambda$typeInfoSerializerConverter$0(PythonTypeUtils.java:747) ~[?:?]
at java.util.stream.ReferencePipeline$3$1.accept(Unknown Source) ~[?:?]
at java.util.Spliterators$ArraySpliterator.forEachRemaining(Unknown Source) ~[?:?]
at java.util.stream.AbstractPipeline.copyInto(Unknown Source) ~[?:?]
at java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source) ~[?:?]
at java.util.stream.AbstractPipeline.evaluate(Unknown Source) ~[?:?]
at java.util.stream.AbstractPipeline.evaluateToArrayNode(Unknown Source) ~[?:?]
at java.util.stream.ReferencePipeline.toArray(Unknown Source) ~[?:?]
at org.apache.flink.streaming.api.utils.PythonTypeUtils$TypeInfoToSerializerConverter.typeInfoSerializerConverter(PythonTypeUtils.java:748) ~[?:?]
at org.apache.flink.streaming.api.operators.python.process.AbstractExternalOneInputPythonFunctionOperator.open(AbstractExternalOneInputPythonFunctionOperator.java:108) ~[?:?]
at org.apache.flink.streaming.api.operators.python.process.ExternalPythonProcessOperator.open(ExternalPythonProcessOperator.java:64) ~[?:?]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) ~[flink-dist-1.16.0.jar:1.16.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726) ~[flink-dist-1.16.0.jar:1.16.0]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) ~[flink-dist-1.16.0.jar:1.16.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702) ~[flink-dist-1.16.0.jar:1.16.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669) ~[flink-dist-1.16.0.jar:1.16.0]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) ~[flink-dist-1.16.0.jar:1.16.0]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) ~[flink-dist-1.16.0.jar:1.16.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) ~[flink-dist-1.16.0.jar:1.16.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) ~[flink-dist-1.16.0.jar:1.16.0]
at java.lang.Thread.run(Unknown Source) ~[?:?]
Appreciate any advice/pointers to resolving this issue.Dan Hill
11/18/2022, 7:53 AMchunilal kukreja
11/18/2022, 11:46 AMchunilal kukreja
11/18/2022, 11:51 AMclen.moras
11/18/2022, 12:25 PMCould not resolve ResourceManager address <akka.tcp://flink@service-name.ns:6123/user/rpc/resourcemanager_0>,
here the service name is headless service created when we set the cluster in standalone mode.
Any pointers as to what i m doing wrong, would be greatly helpful. Since reproducing it is not always the same effect. It could vary between the number of TM's or sometimes all fail.Felix Angell
11/18/2022, 4:10 PMKyle Ahn
11/18/2022, 8:29 PMFailed to commit after recovery s3://.../file.parquet with MPU ID j_nxH7JJKegFvFiOEsNcNjKuSuZ2UhnEO4.rkTCXBu.DJ9qx4cgp8Ux87gDnY_vHjEOMpVgApBOzqfqTp8uI20uLITzIabGrHWrWQDyTdZ2EKqzQoFf0ichfi5W7TUgzLed8QCqLqY0NCDJDmEYraCz41mN6wjaQv01WWgxl11Q-. Checking if file was committed before...
This occurred while trying to update FlinkDeployment
in HA mode using last-state
upgradeMode.
It seems that validating the last successful checkpointing commit failed, and the job does not start but gets stuck in there. Is there a way to skip the failed checkpointing validation after n
times ?Tawfik Yasser
11/18/2022, 9:30 PMWatermarkGeneratorSupplier
actually do?
cause I didn't understand the attached comment in flink
/**
* A supplier for {@link WatermarkGenerator WatermarkGenerators}. The supplier pattern is used to
* avoid having to make {@link WatermarkGenerator} {@link Serializable} for use in API methods.
*
* <p>This interface is {@link Serializable} because the supplier may be shipped to workers during
* distributed execution.
*/
Rommel
11/18/2022, 11:46 PMxxx-flink-78954c766b-hprhj 1/1 Running 0 2m12s
xxx-flink-taskmanager-1-1 1/1 Running 0 93s
xxx-flink-taskmanager-1-2 1/1 Running 0 93s
xxx-flink-taskmanager-1-3 1/1 Running 0 93s
xxx-flink-taskmanager-1-4 1/1 Running 0 93s
chunilal kukreja
11/19/2022, 9:13 AM