Hi all, When upgrading a job from 1.15 to 1.19 on...
# troubleshooting
l
Hi all, When upgrading a job from 1.15 to 1.19 on aws managed flink I am getting a runtime error
Exception occurred while setting the current key context.
(see thread for full error). The job is consuming messages from kinesis, processing them and then producing them to another kinesis stream. Running locally I don’t get the error (in a simplified Flink cluster I have), but when it is running in AWS it fails. It’s quite hard to debug and find where this actual issue is occurring. I have taken a lot of inspiration from aws’ sample code for flink, so perhaps it could be this line on the kinesis sink:
setPartitionKeyGenerator(element -> String.valueOf(element.hashCode()))
Copy code
java.lang.RuntimeException: Exception occurred while setting the current key context.
	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:385)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.setCurrentKey(AbstractStreamOperator.java:520)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:515)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:490)
	at org.apache.flink.streaming.api.operators.OneInputStreamOperator.setKeyContextElement(OneInputStreamOperator.java:36)
	at org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:59)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:238)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:579)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:960)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:753)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: Key group 1 is not in KeyGroupRange{startKeyGroup=50, endKeyGroup=51}. Unless you're directly using low level state access APIs, this is most likely caused by non-deterministic shuffle key (hashCode and equals implementation).
	at org.apache.flink.runtime.state.KeyGroupRangeOffsets.newIllegalKeyGroupException(KeyGroupRangeOffsets.java:37)
	at org.apache.flink.runtime.state.InternalKeyContextImpl.setCurrentKeyGroupIndex(InternalKeyContextImpl.java:74)
	at org.apache.flink.runtime.state.AbstractKeyedStateBackend.setCurrentKey(AbstractKeyedStateBackend.java:249)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.setCurrentKey(RocksDBKeyedStateBackend.java:430)
	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:383)
	... 18 more
h
Hi, please ensure that the
hashcode
implementation is deterministic between JVMs, when you do a
keyBy
in the Flink job graph https://stackoverflow.com/questions/63677057/hashcode-and-equals-method-for-custom-classes-in-flink
The
setPartitionKeyGenerator
used here will actually not cause this issue, as this is only used for the partition key when writing into the KDS stream. (This decides which shard it goes into, and Kinesis guarantees ordering only within the shard)
l
okay, elsewhere in the code i do this
Copy code
KinesisStreamsSink<AmxIdList> sink = createKinesisSink(configuration);

DataStream<AmxIdList> identifierDataStream = buidDataStream
.keyBy(RandomKeySelector.forBuckets(configuration.getProducerConcurrency()))
...
h
Yep, that is probably the important one.
RandomKeySelector.xxx
needs to be deterministic for each record in the DataStream!
🙌 1
l
okkkk
👍 1
I will try something like this
Copy code
.keyBy(element -> element.hashCode() % configuration.getProducerConcurrency())
Where we set the “producerConcurrency” to a number like 32. Does it make sense to split the elements into a specific number like this? Is there a principle to determinne how large it should be
this worked 🙂
a
Same issue here. You may run into the same issue if you change
configuration.getProducerConcurrency()
value. Result of
keyBy
should be deterministic based on the element in the stream.
1
Also, do you use keyed stream for anything rather than to distribute your records? Based on your implementation, you don’t perform any logical grouping of records. If this is for scaling purposes, will something like
rescale()
work better for you? https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/overview/#rescaling or
shuffle()
, for rundom uniform distribution: https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/overview/#random-partitioning
plusone 1