hey flink friends, we've recently been moving over...
# troubleshooting
f
hey flink friends, we've recently been moving over to 1.15 pyflink and have noticed that a new operator has been introduced in the graph that seems to have a different parallelism set than expected (see screenshot). that said, in our code we enforce a max parallelism of 20. the chain is something along the lines of:
Copy code
env.add_source(
    kafka_consumer,
    ...
)
.uid("some id")
.name("some id")
.set_parallelism(20)
.set_max_parallelism(20)
.assign_timestamps_and_watermarks(watermark)
.uid("some id")
.name("some id")
.set_parallelism(20)
.set_max_parallelism(20)
any ideas where this new op has come from and how the parallelism can be configured for it? there is no obvious way to do this that i can see from the docs and i can't find much on google
we're also seeing errors where we have RocksDB state throwing an error around
WriteBatch has wrong count
which is apparently thrown when 'verifyChecksum' fails ... not sure what this does but it happens frequently.
and some logs such as
Copy code
org.apache.flink.runtime.taskmanager.AsynchronousException: Caught exception while processing timer.
	at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1535)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1510)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1650)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$deferCallbackToMailbox$21(StreamTask.java:1639)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
	at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: TimerException{java.lang.RuntimeException: Error while waiting for BeamPythonFunctionRunner flush}
	... 14 more
Caused by: java.lang.RuntimeException: Error while waiting for BeamPythonFunctionRunner flush
	at org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:106)
	at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:299)
	at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:115)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1648)
	... 13 more
Caused by: java.lang.RuntimeException: Failed to close remote bundle
	at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:382)
	at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:366)
	at org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator.lambda$invokeFinishBundle$0(AbstractExternalPythonFunctionOperator.java:85)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction 192: Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute
    response = task()
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 606, in do_instruction
    return getattr(self, request_type)(
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 644, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 996, in process_bundle
    self.ops[element.transform_id].process_timer(
  File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_operations_slow.py", line 171, in process_timer
    self._output_processor.process_outputs(
  File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_operations_slow.py", line 63, in process_outputs
    self._consumer.process(windowed_value.with_value(results))
  File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_operations_slow.py", line 131, in process
    for value in o.value:
  File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/datastream/input_handler.py", line 81, in process_timer
    yield from _emit_results(
  File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/datastream/input_handler.py", line 101, in _emit_results
    for result in results:
  File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/datastream/input_handler.py", line 90, in _on_event_time
    yield from self._on_event_time_func(timestamp, key, namespace)
  File "/tmp/python-dist-5f75f61c-925c-4fc0-b994-e1b897165b0b/python-files/61a6d42e-8f12-4693-90e6-8e4d8ff83925_code/61a6d42e-8f12-4693-90e6-8e4d8ff83925_code/enrichers/session/func_enrich.py", line 145, in on_timer
    events = self.events.get(event_received_at_millis)
  File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/state_impl.py", line 915, in get
    return self.get_internal_state().get(key)
  File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/state_impl.py", line 772, in get
    exists, value = self._map_state_handler.blocking_get(
  File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/state_impl.py", line 418, in blocking_get
    cached_map_state.put(map_key, (exists, value))
  File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/state_impl.py", line 319, in put
    super(CachedMapState, self).put(key, exists_and_value)
  File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/state_impl.py", line 68, in put
    self._on_evict(name, value)
  File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/state_impl.py", line 305, in on_evict
    self._cached_keys.remove(key)
KeyError: 1680536326836

	at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
	at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
	at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
	at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)
	at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:555)
	at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:380)
	... 7 more
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 192: Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute
    response = task()
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 606, in do_instruction
    return getattr(self, request_type)(
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 644, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 996, in process_bundle
    self.ops[element.transform_id].process_timer(
  File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_operations_slow.py", line 171, in process_timer
    self._output_processor.process_outputs(
  File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_operations_slow.py", line 63, in process_outputs
    self._consumer.process(windowed_value.with_value(results))
  File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/beam/beam_operations_slow.py", line 131, in process
    for value in o.value:
  File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/datastream/input_handler.py", line 81, in process_timer
    yield from _emit_results(
  File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/datastream/input_handler.py", line 101, in _emit_results
    for result in results:
  File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/datastream/input_handler.py", line 90, in _on_event_time
    yield from self._on_event_time_func(timestamp, key, namespace)
  File "/tmp/python-dist-5f75f61c-925c-4fc0-b994-e1b897165b0b/python-files/61a6d42e-8f12-4693-90e6-8e4d8ff83925_code/61a6d42e-8f12-4693-90e6-8e4d8ff83925_code/enrichers/session/func_enrich.py", line 145, in on_timer
    events = self.events.get(event_received_at_millis)
  File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/state_impl.py", line 915, in get
    return self.get_internal_state().get(key)
  File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/state_impl.py", line 772, in get
    exists, value = self._map_state_handler.blocking_get(
  File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/state_impl.py", line 418, in blocking_get
    cached_map_state.put(map_key, (exists, value))
  File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/state_impl.py", line 319, in put
    super(CachedMapState, self).put(key, exists_and_value)
  File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/state_impl.py", line 68, in put
    self._on_evict(name, value)
  File "/usr/local/lib/python3.8/site-packages/pyflink/fn_execution/state_impl.py", line 305, in on_evict
    self._cached_keys.remove(key)
KeyError: 1680536326836

	at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:180)
	at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:160)
	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)
	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)
	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
	... 3 more
this one where there is a KeyError from RocksDB
h
Re the parallelism, try setting it directly on the
env
object! Also, we recommend setting max_parallelism to something high, rather than 20. Your app will not be able to scale beyond the max parallelism with the same snapshot.
Copy code
env
.set_parallelism(20)
.set_max_parallelism(32768)
.add_source(
    kafka_consumer,
    ...
)
.uid("some id")
.name("some id")
.set_parallelism(20)
.assign_timestamps_and_watermarks(watermark)
.uid("some id")
.name("some id")
Re: the RocksDB state issue, mind including stack trace for “WriteBatch has wrong count”?
f
we set the parallelism only on certain operators to throttle the source. so we have our source on 20 parallelism but our sink on 60. Does this mean that setting the parallelism on env would apply a max of 20 across every operator? in which case i dont think this would help with us processing when sinking
Here's the stacktrace too btw
Copy code
2023-04-03 15:28:03
org.apache.flink.util.FlinkRuntimeException: Exception while refilling store from iterator.
	at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.checkRefillCacheFromStore(RocksDBCachingPriorityQueueSet.java:340)
	at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:137)
	at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.peek(RocksDBCachingPriorityQueueSet.java:58)
	at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:285)
	at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue$InternalPriorityQueueComparator.comparePriority(KeyGroupPartitionedPriorityQueue.java:271)
	at org.apache.flink.runtime.state.heap.HeapPriorityQueue.isElementPriorityLessThen(HeapPriorityQueue.java:161)
	at org.apache.flink.runtime.state.heap.HeapPriorityQueue.siftDown(HeapPriorityQueue.java:141)
	at org.apache.flink.runtime.state.heap.HeapPriorityQueue.adjustElementAtIndex(HeapPriorityQueue.java:107)
	at org.apache.flink.runtime.state.heap.HeapPriorityQueue.adjustModifiedElement(HeapPriorityQueue.java:69)
	at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.poll(KeyGroupPartitionedPriorityQueue.java:102)
	at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:301)
	at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:180)
	at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.processWatermark(AbstractPythonFunctionOperator.java:187)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:239)
	at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:200)
	at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:136)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.util.FlinkRuntimeException: org.rocksdb.RocksDBException: WriteBatch has wrong count
	at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.flushWriteBatch(RocksDBCachingPriorityQueueSet.java:314)
	at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.orderedBytesIterator(RocksDBCachingPriorityQueueSet.java:304)
	at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.checkRefillCacheFromStore(RocksDBCachingPriorityQueueSet.java:336)
	... 27 more
Caused by: org.rocksdb.RocksDBException: WriteBatch has wrong count
	at org.rocksdb.RocksDB.write0(Native Method)
	at org.rocksdb.RocksDB.write(RocksDB.java:1784)
	at org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper.flush(RocksDBWriteBatchWrapper.java:112)
	at org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.flushWriteBatch(RocksDBCachingPriorityQueueSet.java:312)
	... 29 more
h
Does this mean that setting the parallelism on env would apply a max of 20 across every operator? in which case i dont think this would help with us processing when sinking
Setting the parallelism on
env
will set the default for all operators, then we can set the sink to specifically have
60
e,g.
Copy code
env
.set_parallelism(20)
.set_max_parallelism(32768)
.add_source(
    kafka_consumer,
    ...
)
.uid("some id")
.name("some id")
.set_parallelism(20)
.assign_timestamps_and_watermarks(watermark)
.uid("some id")
.name("some id")
.add_sink(xx)
.set_parallelism(60)
f
how come the new 'extract-timestamp' operator has suddenly appeared though and where does it infer the parallelism from 🤔 our watermarking before was set to 20 but now it appears to have broken into two operators that have 20 and 60 parallelism respectively
we have made no code changes beyond upgrading to 1.15 and the operator graph is very different
h
how come the new ‘extract-timestamp’ operator has suddenly appeared though and where does it infer the parallelism from
Hmm I’m not sure here - probably the default parallelism in
flink-config
?
we have made no code changes beyond upgrading to 1.15 and the operator graph is very different
🤔 hmmm ok… I guess this depends on your setup. I know Beam has changed the method of job graph construction between Flink versions…
f
so could this mean it has always worked like this under the hood and now it's just made obvious on the graph?
cc @Duc Anh Khu
h
so could this mean it has always worked like this under the hood and now it’s just made obvious on the graph?
It depends on what you mean by “very different”, do you have examples?
The Beam configuration I’m referring to is here https://beam.apache.org/blog/beam-2.25.0/#highlights Specifically, Read transform for Java runners has been reworked to use Splittable DoFn
🤔 1
d
hi, do you know if we need to
set_parallelism
on
assign_timestamps_and_watermarks
?
h
hi, do you know if we need to
set_parallelism
on
assign_timestamps_and_watermarks
?
Definitely worth trying, but I think this might be a gap in Python API… See here: https://github.com/apache/flink/blame/bee3e9442fa2749f0b2e21d79949b5410fa422e5/flink-python/pyflink/datastream/data_stream.py#L689-L701
assign_timestamps_and_watermarks
seems to be composed of multiple processFunctions. The
Extract-Timestamp
is a ProcessFunction, then followed by
Remove-Timestamp
, so I don’t think you can specify parallelism of
Extract-Timestamp
😓 1