Sumit Khaitan
05/02/2023, 6:14 AMFelix Angell
05/02/2023, 9:24 AMGianluca Cacace
05/02/2023, 9:30 AMChristophe Bornet
05/02/2023, 11:16 AMAndrew Otto
05/02/2023, 3:13 PMtaskmanager.memory.process.size
. I guess in minicluster mode those options don’t apply, because TM and JM are in same process? Do I need to set e.g. -Xmx manually?Олег Спица
05/02/2023, 3:38 PMOneInputStreamOperatorTestHarness
with new FileSink
instead of deprecated StreamingFileSink
? StreamingFileSink
implements SinkFunction
and can be used as parameter for OneInputStreamOperator
, but new FileSink
isn’t suitable here.Felix Angell
05/02/2023, 4:02 PMNathanael England
05/02/2023, 4:49 PM2023-05-02 16:31:49,152 ERROR /usr/local/lib/python3.8/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - Traceback (most recent call last):
File "/usr/local/lib/python3.8/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 287, in _execute
response = task()
File "/usr/local/lib/python3.8/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 360, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/lib/python3.8/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 596, in do_instruction
return getattr(self, request_type)(
File "/usr/local/lib/python3.8/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 634, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/usr/local/lib/python3.8/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 1003, in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
File "/usr/local/lib/python3.8/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 227, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 526, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 528, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 237, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 169, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 194, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 104, in pyflink.fn_execution.beam.beam_operations_fast.IntermediateOutputProcessor.process_outputs
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 169, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 194, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 92, in pyflink.fn_execution.beam.beam_operations_fast.NetworkOutputProcessor.process_outputs
File "pyflink/fn_execution/beam/beam_coder_impl_fast.pyx", line 101, in pyflink.fn_execution.beam.beam_coder_impl_fast.FlinkLengthPrefixCoderBeamWrapper.encode_to_stream
File "pyflink/fn_execution/coder_impl_fast.pyx", line 273, in pyflink.fn_execution.coder_impl_fast.IterableCoderImpl.encode_to_stream
File "pyflink/fn_execution/coder_impl_fast.pyx", line 401, in pyflink.fn_execution.coder_impl_fast.RowCoderImpl.encode_to_stream
File "pyflink/fn_execution/coder_impl_fast.pyx", line 391, in pyflink.fn_execution.coder_impl_fast.RowCoderImpl.encode_to_stream
AttributeError: 'dict' object has no attribute 'get_fields_by_names'
Zhiyu Tian
05/03/2023, 3:46 AM*org.apache.parquet.format.PageHeader: Required field 'uncompressed_page_size' was not found in serialized data*
.
## Run environment:
Hadoop 2.9.1
Flink Operator: 1.16.1
Flink version: 1.16
## Investigation
I searched the error https://github.com/trinodb/trino/issues/2256, but it did not help.
## Full stack:
51120139836 at 10485760; previously tried [DatanodeInfoWithStorage[/BN2/10/308/09/[260310b0515206a00645be620]10010,DS-07f646f6-dcbf-4b98-8580-1adc1911da0b,DISK]].
2023-05-03 032615,221 INFO org.apache.hadoop.hdfs.DFSClient [] - Spawning 1 hedged read to DatanodeInfoWithStorage[/BN2/10/307/08/[260310b0515205900645bedcd]10010,DS-59d240da-da8d-47a0-a61b-cb12af056f81,DISK] for BP-1520273214-10.27.194.109-1568248216274:blk_52162250811_51120139836 at 61865984; previously tried [DatanodeInfoWithStorage[/BN2/10/308/09/[260310b0515206a00645be620]10010,DS-07f646f6-dcbf-4b98-8580-1adc1911da0b,DISK], DatanodeInfoWithStorage[/BN2/10/306/14/[260310b0515204f00645beb6e]10010,DS-1c533525-e37e-4671-88e5-ca79ad3f28a3,DISK]].
2023-05-03 032615,223 ERROR org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - Received uncaught exception.
java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150) ~[flink-connector-files-1.16.0.jar:1.16.0]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105) [flink-connector-files-1.16.0.jar:1.16.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.io.IOException: can not read class org.apache.parquet.format.PageHeader: Required field 'uncompressed_page_size' was not found in serialized data! Struct: org.apache.parquet.format.PageHeader$PageHeaderStandardScheme@112523e7
at org.apache.parquet.format.Util.read(Util.java:365) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?]
at org.apache.parquet.format.Util.readPageHeader(Util.java:132) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?]
at org.apache.parquet.hadoop.ParquetFileReader$Chunk.readPageHeader(ParquetFileReader.java:1382) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?]
at org.apache.parquet.hadoop.ParquetFileReader$Chunk.readAllPages(ParquetFileReader.java:1429) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?]
at org.apache.parquet.hadoop.ParquetFileReader$Chunk.readAllPages(ParquetFileReader.java:1402) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?]
at org.apache.parquet.hadoop.ParquetFileReader.readChunkPages(ParquetFileReader.java:1023) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?]
at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:928) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?]
at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.readNextRowGroup(ParquetVectorizedInputFormat.java:396) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?]
at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.nextBatch(ParquetVectorizedInputFormat.java:378) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?]
at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.readBatch(ParquetVectorizedInputFormat.java:355) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?]
at org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67) ~[flink-connector-files-1.16.0.jar:1.16.0]
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) ~[flink-connector-files-1.16.0.jar:1.16.0]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142) ~[flink-connector-files-1.16.0.jar:1.16.0]
... 6 more
Caused by: shaded.parquet.org.apache.thrift.protocol.TProtocolException: Required field 'uncompressed_page_size' was not found in serialized data! Struct: org.apache.parquet.format.PageHeader$PageHeaderStandardScheme@112523e7
at org.apache.parquet.format.PageHeader$PageHeaderStandardScheme.read(PageHeader.java:1108) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?]
at org.apache.parquet.format.PageHeader$PageHeaderStandardScheme.read(PageHeader.java:1019) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?]
at org.apache.parquet.format.PageHeader.read(PageHeader.java:896) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?]
at org.apache.parquet.format.Util.read(Util.java:362) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?]
at org.apache.parquet.format.Util.readPageHeader(Util.java:132) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?]
at org.apache.parquet.hadoop.ParquetFileReader$Chunk.readPageHeader(ParquetFileReader.java:1382) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?]
at org.apache.parquet.hadoop.ParquetFileReader$Chunk.readAllPages(ParquetFileReader.java:1429) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?]
at org.apache.parquet.hadoop.ParquetFileReader$Chunk.readAllPages(ParquetFileReader.java:1402) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?]
at org.apache.parquet.hadoop.ParquetFileReader.readChunkPages(ParquetFileReader.java:1023) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?]
at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:928) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?]
at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.readNextRowGroup(ParquetVectorizedInputFormat.java:396) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?]
at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.nextBatch(ParquetVectorizedInputFormat.java:378) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?]
at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.readBatch(ParquetVectorizedInputFormat.java:355) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?]
at org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67) ~[flink-connector-files-1.16.0.jar:1.16.0]
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) ~[flink-connector-files-1.16.0.jar:1.16.0]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142) ~[flink-connector-files-1.16.0.jar:1.16.0]
... 6 more
2023-05-03 032615,225 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask [] - Cleanup StreamTask (operators closed: false, cancelled: false)Sumit Khaitan
05/03/2023, 4:20 AMBhupendra Yadav
05/03/2023, 5:41 AMorg.apache.flink.runtime.rest.handler.RestHandlerException: Jar file /<jar-dir>/ce239a09-463f-4667-8f81-9cd6a76f5454_my-jar.jar
does not exist\n\ta ...
Our understanding is this is happening because the jar is uploaded to one of JM's local FileSystem by the flink operator(our web.upload.dir is a k8s emptyDir atm) and if the job submit request goes to other JM, it fails as jar doesn't exist.
One way to fix this is to use a Persistent Volume mounted to both JM's jar directory(web.upload.dir).
I have a few questions:
1. we are wondering if is there any easier way to fix it?
2. Does the operator downloads the jar and upload it to JM or does the operator instruct the JM to pass jarURI to it downloading the jar?Amenreet Singh Sodhi
05/03/2023, 6:46 AMorg.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available.
The same issue is being talked about in ticket:
https://issues.apache.org/jira/browse/FLINK-27962?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel
but its an old ticket. Can anyone share the latest ticket for this issue, or if it has been resolved for 1.16.0 version, please share the details. Thanks.Rashmin Patel
05/03/2023, 9:03 AMstate-processor-api
which is ported to DataStream API since 1.16.0
My bootstrap job program was written in DataSet API which used to get complete in 30 mins but after migrating that to DataStream API and running it in BATCH execution mode, I am seeing a significant performance degradation. Now the same program takes ~4x time.
I am providing same resources to both the programs.
Has anyone faced such issue ?Pedro Mázala
05/03/2023, 12:15 PMmode: standalone
) to be able to use Flink in the reactive mode (scheduler-mode: reactive
).
My task manager pod has my modifications (/opt/flink/usrlib/
jars) but my job manager pod seems to have the same content as the default flink image. Do you know what could lead to this?
My deployment is
apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
kind: FlinkDeployment
metadata:
name: {{ template "flink-application-mode.name" . }}
spec:
image: "{{ .Values.imageRepository }}/{{ .Values.imageName }}:{{ .Values.imageTag }}"
imagePullPolicy: IfNotPresent
serviceAccount: {{ .Values.serviceAccountName }}
flinkVersion: v1_16
podTemplate:
spec:
containers:
- name: flink-main-container
env:
- name: APPLICATION_CREDENTIALS
value: /secrets/credentials.json
volumeMounts:
...
volumes:
...
flinkConfiguration:
scheduler-mode: reactive
jobManager:
replicas: 1
resource:
{{- toYaml .Values.resources.jobmanager.requests | nindent 6 }}
taskManager:
replicas: {{ .Values.replicaCount }}
resource:
{{- toYaml .Values.resources.taskmanager.requests | nindent 6 }}
mode: standalone
job:
jarURI: local:///opt/flink/usrlib/custom.jar
entryClass: {{ required "A valid .Values.jobmanager.entryClass entry required!" .Values.jobmanager.entryClass }}
parallelism: 2
upgradeMode: savepoint # last-state OR stateless
Felix Angell
05/03/2023, 4:54 PMAdam Augusta
05/03/2023, 8:48 PMMatthew Kerian
05/04/2023, 5:37 AMOtto Remse
05/04/2023, 8:27 AMflink_taskmanager_job_task_operator_numRecordsOut{task_name=~"Source:.*"}
or flink_taskmanager_job_task_numRecordsOut{task_name=~"Source:.*"}
they're always zero. What metric name is the flink dashboard using for the value that is visible in the picture?Slackbot
05/04/2023, 2:04 PMDylan Fontana
05/04/2023, 2:51 PMshubham gupta
05/04/2023, 4:18 PMViswa Teja Kuncham
05/04/2023, 6:08 PMMaroš Mamrák
05/04/2023, 6:09 PMDavid Christle
05/05/2023, 12:11 AMrowtime
in the toDataStream
conversion, at least in batch execution mode. The core logic of what I’m doing is in the attached snippet. At a high level, the code extracts a field from a POJO in a DataStream, converts to the Table API, performs a Window TVF, and converts back to a DataStream<Tuple<>>. Then, that DataStream is converted back into a Table, where another Window TVF is applied, and the output is converted back to a different Tuple DataStream. I realize there’s no need to convert the first table back to DataStream, but the point is the toDataStream
conversion isn’t, apparently, propagating the rowtime
on the first Table.
When this logic runs in batch execution mode, the error is: org.apache.flink.util.FlinkRuntimeException: Could not find timestamp in DataStream API record. Make sure that timestamps have been assigned before and the event-time characteristic is enabled.
. From inserting some .shuffle calls to split the operators and narrow down the error source, it looks like the first phase of the 2nd window TVF is where it happens, i.e. this stage fails:
[40]:TableSourceScan(table=[[*anonymous_datastream_source$6*]], fields=[f0, f1, rowtime])
+- [41]:Calc(select=[f0, f1, CAST(rowtime AS TIMESTAMP_LTZ(3) *ROWTIME*) AS rowtime])
+- [42]:WindowTableFunction(window=[HOP(time_col=[rowtime], size=[86400000 ms], slide=[1 h])])
+- [43]:Calc(select=[f0, window_start, window_end, window_time AS rowtime, f1])
+- [44]:LocalHashAggregate(groupBy=[f0, window_start, window_end, rowtime], select=[f0, window_start, window_end, rowtime, Partial_SUM(f1) AS sum$0])
The earlier stage looks like this:
[37]:HashAggregate(isMerge=[true], groupBy=[f0, window_start, window_end, rowtime], select=[f0, window_start, window_end, rowtime, Final_COUNT(count1$0) AS f3])
+- [38]:Calc(select=[f0, f3, rowtime])
+- [39]:ConstraintEnforcer[NotNullEnforcer(fields=[f3, rowtime])]
+- TableToDataSteam(type=ROW<`f0` INT, `f3` BIGINT NOT NULL, `rowtime` TIMESTAMP_LTZ(3) NOT NULL> NOT NULL, rowtime=false)
Not sure if it’s relevant, but I thought the rowtime=false
was unusual. I expected the first Window TVF should propagate a time attribute. The documentation says window_time
is a time attribute, and shows how it’s used in a subsequent TVF as the rowtime
. So, I’d expect it to propagate via toDataStream
, too.Zhang Zhao
05/05/2023, 1:48 AMException in thread "main" java.lang.RuntimeException: Failed to fetch next result at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109) at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370) at org.apache.flink.table.utils.PrintUtils.printAsTableauForm(PrintUtils.java:152) at org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:160) at com.zjlab.shuzihu.MysqlToEs_online.main(MysqlToEs_online.java:204) Caused by: java.io.IOException: Failed to fetch job execution result at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:177) at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:120) at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) ... 5 more Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:175) ... 7 more Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628) at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:134) at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:174) ... 7 more Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:252) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:242) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:233) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:684) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444) at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:537) at akka.actor.Actor.aroundReceive$(Actor.scala:535) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) at akka.actor.ActorCell.invoke(ActorCell.scala:548) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) at akka.dispatch.Mailbox.run(Mailbox.scala:231) at akka.dispatch.Mailbox.exec(Mailbox.scala:243) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172) Caused by: java.lang.IllegalArgumentException: Invalid range: [+INF..{116,128,0,0,0,0,0,2,66,95,114,223,255,255,255,255,255,255,249}) at org.apache.flink.shaded.guava30.com.google.common.collect.Range.<init>(Range.java:358) at org.apache.flink.shaded.guava30.com.google.common.collect.Range.create(Range.java:156) at org.apache.flink.shaded.guava30.com.google.common.collect.Range.intersection(Range.java:558) at org.tikv.cdc.CDCClient.overlapWithRegion(CDCClient.java:235) at org.tikv.cdc.CDCClient.addRegions(CDCClient.java:185) at org.tikv.cdc.CDCClient.applyKeyRange(CDCClient.java:178) at org.tikv.cdc.CDCClient.start(CDCClient.java:98) at com.ververica.cdc.connectors.tidb.TiKVRichParallelSourceFunction.run(TiKVRichParallelSourceFunction.java:165) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)
Dheeraj Panangat
05/05/2023, 5:26 AMNitin Bansal
05/05/2023, 11:30 AMTypeError: Argument 'input_stream' has incorrect type (expected pyflink.fn_execution.stream_fast.LengthPrefixInputStream, got BeamInputStream)
It comes when when I add a streaming source to the environment.
Any work around the error?Vladimir Tiukhtin
05/05/2023, 11:57 AMThijs van de Poll
05/05/2023, 4:02 PMHussain Abbas
05/05/2023, 4:18 PMreactive scheduler
and standalone mode
. We are using HPA to scale up and down the task managers based on metrics, while flink takes cares of vertical scaling.
Our pipeline consists of Kinesis as source and Dynamo as Sink. There are few issues we are seeing right now.
1- We are getting failing checkpoints with unalligned checkpoints, which causes taskmanager restart
2- Task manager restarts on high load not sure what is causing though as we cannot see any exceptions
We have a very high load environments 10-12k events/s. Do you think there is any room for improvement? Are we handling it correctly?