Hi All! I am running my Flink app on AWS KDA and t...
# troubleshooting
t
Hi All! I am running my Flink app on AWS KDA and there is no problem when running with one KPU but when I try run my app with more than one KPU then i am facing this error
Copy code
java.lang.IllegalArgumentException: key group from 0 to 6 does not contain 120
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:191)
at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:186)
at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:179)
at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:114)
at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:233)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerEventTimeTimer(WindowOperator.java:922)
please let me know if you have any idea what’s going on here? I have been stuck here for a few days but literally found no clue.
h
Hey, this is not specific to KDA, but probably some issue with the hashing you are using. How are you calculating the hash for your
keyBy
? Is it deterministic or random?
t
i think i am using deterministic
Copy code
.keyBy(new KeySelector<Tuple3<String, RateRule, Long>, Tuple2<String, RateRule>>() {
                    @Override
                    public Tuple2<String, RateRule> getKey(Tuple3<String, RateRule, Long> value) throws Exception {
                        return new Tuple2<>(value.f0, value.f1);
                    }
                })
h
Hm, how is the hashcode of
RateRule
calculated?
t
i used the default which is provided/generated by IntelliJ
Copy code
@Override
    public int hashCode() {
        int result = blockKind != null ? blockKind.hashCode() : 0;
        result = 31 * result + (county != null ? county.hashCode() : 0);
        result = 31 * result + (trueClientIp != null ? trueClientIp.hashCode() : 0);
        result = 31 * result + (upmid != null ? upmid.hashCode() : 0);
        result = 31 * result + (limit != null ? limit.hashCode() : 0);
        result = 31 * result + (windowSize != null ? windowSize.hashCode() : 0);
        result = 31 * result + (startTime != null ? startTime.hashCode() : 0);
        result = 31 * result + (expiration != null ? expiration.hashCode() : 0);
        result = 31 * result + (ruleState != null ? ruleState.hashCode() : 0);
        return result;
    }
h
If I am not mistaken, we need to make sure the calculation of the hashcode is deterministic, even through a serialization/deserialization process. I can think of checking the following: β€’ Are any of the fields used for hashCode transient? β€’ Are any of the type of fields used in hashCode calculation non-deterministic?
πŸ‘€ 1
d
I have seen this error before, essentially as @Hong Teoh says the key had changed due to additional fields being added to the pojo.
πŸ™ŒπŸ» 1
πŸ™πŸ» 1
t
@Hong Teoh Thank you so much, i have been literally losing my mind on this bug, πŸ˜‡ gratitude thank yougratitude thank yougratitude thank yougratitude thank yougratitude thank yougratitude thank yougratitude thank yougratitude thank yougratitude thank yougratitude thank yougratitude thank yougratitude thank yougratitude thank you I rewrote the hashcode and now all good
πŸ™Œ 2