Hi! I have noticed following exception in (:thread...
# troubleshoot
s
Hi! I have noticed following exception in (🧵) DataHub Metadata Service. A little digging reveals a possible bug here with seek to negative offsets when current offset on a partition is 0. I have configured all my backend Kafka topics with 3 partitions. Anyone else experienced similar error?
1
Copy code
2023-03-03 13:25:20,154 [ThreadPoolTaskExecutor-1] ERROR o.a.k.c.c.i.ConsumerCoordinator:283 - [Consumer clientId=consumer-datahub-duhe-consumer-job-client-gms-2, groupId=datahub-duhe-consumer-job-client-gms] User provided listener org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener failed on invocation of onPartitionsAssigned for partitions [datahub-upgrade-history-v1-tst-1, datahub-upgrade-history-v1-tst-2, datahub-upgrade-history-v1-tst-0]
java.lang.IllegalArgumentException: seek offset must not be a negative number
        at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1599)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$InitialOrIdleSeekCallback.seek(KafkaMessageListenerContainer.java:3075)
        at com.linkedin.metadata.kafka.boot.DataHubUpgradeKafkaListener.lambda$onPartitionsAssigned$1(DataHubUpgradeKafkaListener.java:70)
        at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
        at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
        at java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1764)
        at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
        at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
        at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
        at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
        at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
        at com.linkedin.metadata.kafka.boot.DataHubUpgradeKafkaListener.onPartitionsAssigned(DataHubUpgradeKafkaListener.java:69)
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.onPartitionsAssigned(MessagingMessageListenerAdapter.java:302)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.seekPartitions(KafkaMessageListenerContainer.java:1127)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$3800(KafkaMessageListenerContainer.java:518)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener.onPartitionsAssigned(KafkaMessageListenerContainer.java:2968)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:278)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:419)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:490)
        at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1414)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1251)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1163)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
2023-03-03 13:25:20,157 [ThreadPoolTaskExecutor-1] ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:149 - Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.common.KafkaException's; no record information is available
        at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:200)
        at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1604)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1212)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.KafkaException: User rebalance callback throws an error
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:422)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:490)
        at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1414)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1251)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1163)
        ... 5 common frames omitted
Caused by: java.lang.IllegalArgumentException: seek offset must not be a negative number
        at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1599)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$InitialOrIdleSeekCallback.seek(KafkaMessageListenerContainer.java:3075)
        at com.linkedin.metadata.kafka.boot.DataHubUpgradeKafkaListener.lambda$onPartitionsAssigned$1(DataHubUpgradeKafkaListener.java:70)
        at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
        at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
        at java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1764)
        at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
        at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
        at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
        at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
        at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
        at com.linkedin.metadata.kafka.boot.DataHubUpgradeKafkaListener.onPartitionsAssigned(DataHubUpgradeKafkaListener.java:69)
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.onPartitionsAssigned(MessagingMessageListenerAdapter.java:302)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.seekPartitions(KafkaMessageListenerContainer.java:1127)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$3800(KafkaMessageListenerContainer.java:518)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener.onPartitionsAssigned(KafkaMessageListenerContainer.java:2968)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:278)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:419)
        ... 14 common frames omitted
b
There was a patch for this. First the upgrade topic was set to 1 partition explicitly. If you’re enforcing more than 1 partition for all topics, then the next release include logic to prevent the negative offset 1.
s
Good to know that it’s being fixed 👍 We need more details in documentation on Kafka backend setup. About replications, partitions and retention. Also there’s alter-config on _schemas topic which only admin user have access to. Setup script in the docker image is the only available source but that doesn’t explain things ‘why’