I am using kubernetes and I am using the helm char...
# troubleshoot
a
I am using kubernetes and I am using the helm chart from acryl. I ran all the init jobs and the upgrade job. However, in datahub-gms I am getting the following error:
Copy code
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener failed on invocation of onPartitionsAssigned for partitions [DataHubUpgradeHistory_v1-0]
java.lang.IllegalArgumentException: seek offset must not be a negative number
	at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1599)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$InitialOrIdleSeekCallback.seek(KafkaMessageListenerContainer.java:3075)
	at com.linkedin.metadata.kafka.boot.DataHubUpgradeKafkaListener.lambda$onPartitionsAssigned$1(DataHubUpgradeKafkaListener.java:70)
	at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
	at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
	at java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1764)
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
	at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
	at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
	at com.linkedin.metadata.kafka.boot.DataHubUpgradeKafkaListener.onPartitionsAssigned(DataHubUpgradeKafkaListener.java:69)
	at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.onPartitionsAssigned(MessagingMessageListenerAdapter.java:302)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.seekPartitions(KafkaMessageListenerContainer.java:1127)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$3800(KafkaMessageListenerContainer.java:518)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerConsumerRebalanceListener.onPartitionsAssigned(KafkaMessageListenerContainer.java:2968)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:278)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:419)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:490)
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1414)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1251)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1163)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
In my understanding, that means that the application is trying to seek for messages in a negative offset. Is this the error? Why is this happening? Any ideas?
a
Is the pod still working, or is it failing? What datahub version/helm version
a
no it's not. i am using v0.10.0 and the latest helm charts
a
@brainy-tent-14503 may be able to help you out here!
a
There is a bug when the number of kafka partitions for this topic are > 1. This was fixed, but has caused some issues. If possible, the fix I know of without upgrading to the next version once released, would be to use the kafka cli to delete and re-create the topic with one partition. The topic named
DataHubUpgradeHistory_v1
specifically