Felix Angell
04/03/2023, 2:39 PMenv.add_source(
kafka_consumer,
...
)
.uid("some id")
.name("some id")
.set_parallelism(20)
.set_max_parallelism(20)
.assign_timestamps_and_watermarks(watermark)
.uid("some id")
.name("some id")
.set_parallelism(20)
.set_max_parallelism(20)
Felix Angell
04/03/2023, 2:41 PMFelix Angell
04/03/2023, 3:29 PMWriteBatch has wrong count
which is apparently thrown when 'verifyChecksum' fails ... not sure what this does but it happens frequently.Felix Angell
04/03/2023, 3:42 PMorg.apache.flink.runtime.taskmanager.AsynchronousException: Caught exception while processing timer.
at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1535)
at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1510)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1650)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$deferCallbackToMailbox$21(StreamTask.java:1639)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: TimerException{java.lang.RuntimeException: Error while waiting for BeamPythonFunctionRunner flush}
... 14 more
Caused by: java.lang.RuntimeException: Error while waiting for BeamPythonFunctionRunner flush
at org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:106)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:299)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:115)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1648)
... 13 more
Caused by: java.lang.RuntimeException: Failed to close remote bundle
at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:382)
at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:366)
at org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator.lambda$invokeFinishBundle$0(AbstractExternalPythonFunctionOperator.java:85)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction 192: Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute
response = task()
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 606, in do_instruction
return getattr(self, request_type)(
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 644, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 996, in process_bundle
self.ops[element.transform_id].process_timer(
File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_operations_slow.py", line 171, in process_timer
self._output_processor.process_outputs(
File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_operations_slow.py", line 63, in process_outputs
self._consumer.process(windowed_value.with_value(results))
File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_operations_slow.py", line 131, in process
for value in o.value:
File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/datastream/input_handler.py", line 81, in process_timer
yield from _emit_results(
File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/datastream/input_handler.py", line 101, in _emit_results
for result in results:
File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/datastream/input_handler.py", line 90, in _on_event_time
yield from self._on_event_time_func(timestamp, key, namespace)
File "/tmp/python-dist-5f75f61c-925c-4fc0-b994-e1b897165b0b/python-files/61a6d42e-8f12-4693-90e6-8e4d8ff83925_code/61a6d42e-8f12-4693-90e6-8e4d8ff83925_code/enrichers/session/func_enrich.py", line 145, in on_timer
events = self.events.get(event_received_at_millis)
File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/state_impl.py", line 915, in get
return self.get_internal_state().get(key)
File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/state_impl.py", line 772, in get
exists, value = self._map_state_handler.blocking_get(
File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/state_impl.py", line 418, in blocking_get
cached_map_state.put(map_key, (exists, value))
File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/state_impl.py", line 319, in put
super(CachedMapState, self).put(key, exists_and_value)
File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/state_impl.py", line 68, in put
self._on_evict(name, value)
File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/state_impl.py", line 305, in on_evict
self._cached_keys.remove(key)
KeyError: 1680536326836
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:555)
at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:380)
... 7 more
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 192: Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute
response = task()
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 606, in do_instruction
return getattr(self, request_type)(
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 644, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 996, in process_bundle
self.ops[element.transform_id].process_timer(
File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_operations_slow.py", line 171, in process_timer
self._output_processor.process_outputs(
File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_operations_slow.py", line 63, in process_outputs
self._consumer.process(windowed_value.with_value(results))
File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_operations_slow.py", line 131, in process
for value in o.value:
File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/datastream/input_handler.py", line 81, in process_timer
yield from _emit_results(
File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/datastream/input_handler.py", line 101, in _emit_results
for result in results:
File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/datastream/input_handler.py", line 90, in _on_event_time
yield from self._on_event_time_func(timestamp, key, namespace)
File "/tmp/python-dist-5f75f61c-925c-4fc0-b994-e1b897165b0b/python-files/61a6d42e-8f12-4693-90e6-8e4d8ff83925_code/61a6d42e-8f12-4693-90e6-8e4d8ff83925_code/enrichers/session/func_enrich.py", line 145, in on_timer
events = self.events.get(event_received_at_millis)
File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/state_impl.py", line 915, in get
return self.get_internal_state().get(key)
File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/state_impl.py", line 772, in get
exists, value = self._map_state_handler.blocking_get(
File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/state_impl.py", line 418, in blocking_get
cached_map_state.put(map_key, (exists, value))
File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/state_impl.py", line 319, in put
super(CachedMapState, self).put(key, exists_and_value)
File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/state_impl.py", line 68, in put
self._on_evict(name, value)
File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/state_impl.py", line 305, in on_evict
self._cached_keys.remove(key)
KeyError: 1680536326836
at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:180)
at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:160)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
... 3 more
this one where there is a KeyError from RocksDBHong Teoh
04/03/2023, 4:05 PMenv
object!
Also, we recommend setting max_parallelism to something high, rather than 20. Your app will not be able to scale beyond the max parallelism with the same snapshot.
env
.set_parallelism(20)
.set_max_parallelism(32768)
.add_source(
kafka_consumer,
...
)
.uid("some id")
.name("some id")
.set_parallelism(20)
.assign_timestamps_and_watermarks(watermark)
.uid("some id")
.name("some id")
Hong Teoh
04/03/2023, 4:08 PMFelix Angell
04/03/2023, 4:10 PMFelix Angell
04/03/2023, 4:10 PM2023-04-03 15:28:03
org.apache.flink.util.FlinkRuntimeException: Exception while refilling store from iterator.
at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.checkRefillCacheFromStore(RocksDBCachingPriorityQueueSet.java:340)
at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:137)
at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:58)
at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:285)
at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:271)
at org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:161)
at org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftDown(HeapPriorityQueue.java:141)
at org.apache.flink.runtime.state.heap.HeapPriorityQueue.adjustElementAtIndex(HeapPriorityQueue.java:107)
at org.apache.flink.runtime.state.heap.HeapPriorityQueue.adjustModifiedElement(HeapPriorityQueue.java:69)
at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.poll(KeyGroupPartitionedPriorityQueue.java:102)
at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:301)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:180)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.processWatermark(AbstractPythonFunctionOperator.java:187)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:239)
at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:200)
at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:136)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.util.FlinkRuntimeException: org.rocksdb.RocksDBException: WriteBatch has wrong count
at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.flushWriteBatch(RocksDBCachingPriorityQueueSet.java:314)
at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.orderedBytesIterator(RocksDBCachingPriorityQueueSet.java:304)
at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.checkRefillCacheFromStore(RocksDBCachingPriorityQueueSet.java:336)
... 27 more
Caused by: org.rocksdb.RocksDBException: WriteBatch has wrong count
at org.rocksdb.RocksDB.write0(Native Method)
at org.rocksdb.RocksDB.write(RocksDB.java:1784)
at org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper.flush(RocksDBWriteBatchWrapper.java:112)
at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.flushWriteBatch(RocksDBCachingPriorityQueueSet.java:312)
... 29 more
Hong Teoh
04/03/2023, 4:11 PMDoes this mean that setting the parallelism on env would apply a max of 20 across every operator? in which case i dont think this would help with us processing when sinkingSetting the parallelism on
env
will set the default for all operators, then we can set the sink to specifically have 60
e,g.
env
.set_parallelism(20)
.set_max_parallelism(32768)
.add_source(
kafka_consumer,
...
)
.uid("some id")
.name("some id")
.set_parallelism(20)
.assign_timestamps_and_watermarks(watermark)
.uid("some id")
.name("some id")
.add_sink(xx)
.set_parallelism(60)
Felix Angell
04/03/2023, 4:13 PMFelix Angell
04/03/2023, 4:14 PMHong Teoh
04/03/2023, 4:15 PMhow come the new ‘extract-timestamp’ operator has suddenly appeared though and where does it infer the parallelism fromHmm I’m not sure here - probably the default parallelism in
flink-config
?Hong Teoh
04/03/2023, 4:16 PMwe have made no code changes beyond upgrading to 1.15 and the operator graph is very different🤔 hmmm ok… I guess this depends on your setup. I know Beam has changed the method of job graph construction between Flink versions…
Felix Angell
04/03/2023, 4:19 PMFelix Angell
04/03/2023, 4:19 PMHong Teoh
04/03/2023, 4:20 PMso could this mean it has always worked like this under the hood and now it’s just made obvious on the graph?It depends on what you mean by “very different”, do you have examples?
Hong Teoh
04/03/2023, 4:21 PMDuc Anh Khu
04/03/2023, 4:23 PMset_parallelism
on assign_timestamps_and_watermarks
?Hong Teoh
04/03/2023, 4:32 PMhi, do you know if we need toDefinitely worth trying, but I think this might be a gap in Python API… See here: https://github.com/apache/flink/blame/bee3e9442fa2749f0b2e21d79949b5410fa422e5/flink-python/pyflink/datastream/data_stream.py#L689-L701onset_parallelism
?assign_timestamps_and_watermarks
assign_timestamps_and_watermarks
seems to be composed of multiple processFunctions.
The Extract-Timestamp
is a ProcessFunction, then followed by Remove-Timestamp
, so I don’t think you can specify parallelism of Extract-Timestamp