Louis Cameron Booth
07/30/2024, 2:01 PMException occurred while setting the current key context.
(see thread for full error).
The job is consuming messages from kinesis, processing them and then producing them to another kinesis stream. Running locally I don’t get the error (in a simplified Flink cluster I have), but when it is running in AWS it fails.
It’s quite hard to debug and find where this actual issue is occurring. I have taken a lot of inspiration from aws’ sample code for flink, so perhaps it could be this line on the kinesis sink: setPartitionKeyGenerator(element -> String.valueOf(element.hashCode()))
Louis Cameron Booth
07/30/2024, 2:01 PMjava.lang.RuntimeException: Exception occurred while setting the current key context.
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:385)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.setCurrentKey(AbstractStreamOperator.java:520)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:515)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:490)
at org.apache.flink.streaming.api.operators.OneInputStreamOperator.setKeyContextElement(OneInputStreamOperator.java:36)
at org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:59)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:238)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:579)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:960)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:753)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalArgumentException: Key group 1 is not in KeyGroupRange{startKeyGroup=50, endKeyGroup=51}. Unless you're directly using low level state access APIs, this is most likely caused by non-deterministic shuffle key (hashCode and equals implementation).
at org.apache.flink.runtime.state.KeyGroupRangeOffsets.newIllegalKeyGroupException(KeyGroupRangeOffsets.java:37)
at org.apache.flink.runtime.state.InternalKeyContextImpl.setCurrentKeyGroupIndex(InternalKeyContextImpl.java:74)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.setCurrentKey(AbstractKeyedStateBackend.java:249)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.setCurrentKey(RocksDBKeyedStateBackend.java:430)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:383)
... 18 more
Hong Teoh
07/30/2024, 2:09 PMhashcode
implementation is deterministic between JVMs, when you do a keyBy
in the Flink job graph
https://stackoverflow.com/questions/63677057/hashcode-and-equals-method-for-custom-classes-in-flinkHong Teoh
07/30/2024, 2:11 PMsetPartitionKeyGenerator
used here will actually not cause this issue, as this is only used for the partition key when writing into the KDS stream. (This decides which shard it goes into, and Kinesis guarantees ordering only within the shard)Louis Cameron Booth
07/30/2024, 2:12 PMKinesisStreamsSink<AmxIdList> sink = createKinesisSink(configuration);
DataStream<AmxIdList> identifierDataStream = buidDataStream
.keyBy(RandomKeySelector.forBuckets(configuration.getProducerConcurrency()))
...
Hong Teoh
07/30/2024, 2:12 PMRandomKeySelector.xxx
needs to be deterministic for each record in the DataStream!Louis Cameron Booth
07/30/2024, 2:12 PMLouis Cameron Booth
07/30/2024, 2:25 PM.keyBy(element -> element.hashCode() % configuration.getProducerConcurrency())
Where we set the “producerConcurrency” to a number like 32. Does it make sense to split the elements into a specific number like this? Is there a principle to determinne how large it should beLouis Cameron Booth
07/30/2024, 3:47 PMAleksandr Pilipenko
07/30/2024, 4:07 PMconfiguration.getProducerConcurrency()
value.
Result of keyBy
should be deterministic based on the element in the stream.Aleksandr Pilipenko
07/30/2024, 4:28 PMrescale()
work better for you? https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/overview/#rescaling
or shuffle()
, for rundom uniform distribution: https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/overview/#random-partitioning