Daniel
05/15/2025, 8:21 AMAhmed
05/15/2025, 11:03 AMThomas MacKenzie
05/16/2025, 12:15 AMv0.15.0
and I wanted to ask if there a way to get messages from compacted topics other than using the TableView
?
I'm looking for a way to read messages from a stream (use of a channel) rather than storing all them at once in a map[string]any
like the TableView
does. It's for at least half a million records after compaction.
2. So I tried using a Reader
with ReadCompacted
option set to true
but I'm getting all the messages: is there a way to actually read from the compacted topic with a Reader
? (I'm using persistent topics)
Thank youMustaque
05/19/2025, 2:56 PMhulusi
05/20/2025, 5:57 AMLari Hotari
05/20/2025, 10:14 PMMustaque
05/21/2025, 7:35 PMTopic creation encountered an exception by initialization topic policies service. org.apache.pulsar.metadata.api.MetadataStoreException$NotFoundException cluster-a
Mustaque
05/22/2025, 5:43 AMpulsar-perf consume --service-url <pulsar://localhost:6650> --subscription-type Shared --rate 10000 --num-subscriptions 10 --num-consumers 10 --subscription-position Earliest
Update: If I run pulsar-perf consume --service-url <pulsar://localhost:6650>
<persistent://public/default/my-topic>
then, it runs fine! Most likely, one of the parameters is causing the issue.Lari Hotari
05/23/2025, 7:13 AMMustaque
05/23/2025, 12:13 PMpulsar-perf
produce command to publish 1 million messages, but it is pushing more than that. Not able to understand why it is happening, sometimes it pushes 1.7 million, or sometimes it will push more than 2 million messages. Any help is appreciated.
Thank you!Lari Hotari
05/27/2025, 8:34 PMsindhushree
05/28/2025, 2:12 PMDavid K
05/28/2025, 2:36 PMVaibhav Swarnkar
05/29/2025, 10:12 AMchange_streams_with_pre_image
config mentioned in the Debezium.
Anyone else faced it and found a fix on this?Wallace Peng
05/29/2025, 10:06 PMorg.apache.zookeeper.server.quorum.Learner - Revalidating client:0x3015fd17ce80066
and also on broker side some logs like
it first closed the topic
2025-05-29T21:22:26,341+0000 [bookkeeper-ml-scheduler-OrderedScheduler-0-0] INFO org.apache.pulsar.broker.service.persistent.PersistentTopic - [<persistent://6614/Salesforce/New> Contact] Topic closed
then it will try to create the topic almost immediately but it may fail for once then retry creating then succeed.
any idea why it is doing this ?
btw there is error in client side
Error connecting to broker: org.apache.pulsar.client.api.PulsarClientException$BrokerPersistenceException: {"errorMsg":"org.apache.bookkeeper.mledger.ManagedLedgerException: org.apache.bookkeeper.mledger.ManagedLedgerException$BadVersionException: org.apache.pulsar.metadata.api.MetadataStoreException$BadVersionException: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /managed-ledgers
Stanislaw Makarawec
05/30/2025, 9:44 AMFabri
05/30/2025, 8:15 PMWallace Peng
06/03/2025, 4:38 PMfailed to get Partitioned metadata : {\"errorMsg\":\"org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /managed-ledgers/15548/Orchestration/persistent/closed_periods\",\"reqId\":2511449849216011802, \"remote\":\"pulsar-broker.pulsar.svc.cluster.local.analytics.svc.cluster.local/10.1.177.78:6650\", \"local\":\"/10.11.68.215:34530\"}","logger":"o.a.p.c.i.BinaryProtoLookupService","thread":"pulsar-client-io-5-1","level":"WARN","stacktrace":"<#f1bccd48> o.a.p.c.a.PulsarClientException$BrokerMetadataException: {\"errorMsg\":\"org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /managed-ledgers/15548/Orchestration/persistent/closed_periods\",\"reqId\":2511449849216011802, \"remote\":\"pulsar-broker.pulsar.svc.cluster.local.analytics.svc.cluster.local/10.1.177.78:6650\", \"local\":\"/10.11.68.215:34530\"}\n\tat o.a.p.c.i.ClientCnx.getPulsarClientException(ClientCnx.java:1352)\n\tat o.a.p.c.i.ClientCnx.handlePartitionResponse(ClientCnx.java:691)\n\tat o.a.p.c.p.PulsarDecoder.channelRead(PulsarDecoder.java:144)\n\tat o.a.p.s.i.n.c.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\tat o.a.p.s.i.n.c.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat o.a.p.s.i.n.c.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat o.a.p.s.i.n.h.c.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)\n\tat o.a.p.s.i.n.h.c.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)\n\tat o.a.p.s.i.n.c.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\tat o.a.p.s.i.n.c.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat o.a.p.s.i.n.c.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat o.a.p.s.i.n.h.f.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152)\n\tat o.a.p.s.i.n.c.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)\n\tat o.a.p.s.i.n.c.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat o.a.p.s.i.n.c.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat o.a.p.s.i.n.c.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357)\n\tat o.a.p.s.i.n.c.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)\n\tat o.a.p.s.i.n.c.Abst...\n"}
btw subscription was created successfully after this . but it sometimes gave this warn saying connection loss.
this only happened after 3.0.11 . did we change anything related ? it is just annoyingsindhushree
06/06/2025, 7:10 AMFabri
06/07/2025, 12:11 AMSahaj Kodia
06/09/2025, 4:58 PMAlexander Preuß
06/10/2025, 4:56 PMRémi Collignon-Ducret
06/11/2025, 4:07 PMFabri
06/14/2025, 8:47 AMSahaj Kodia
06/17/2025, 9:59 AMShrey Kothari
06/19/2025, 9:15 AMhulusi
06/20/2025, 11:44 AMEric Satterwhite
06/23/2025, 12:19 PMEric Satterwhite
06/23/2025, 12:28 PMDaniel Kaminski
06/24/2025, 7:46 AMjava.lang.IllegalStateException: Field 'message' is not set
at org.apache.pulsar.common.api.proto.CommandLookupTopicResponse.getMessage(CommandLookupTopicResponse.java:220)
at org.apache.pulsar.client.impl.ClientCnx.handleLookupResponse(ClientCnx.java:629)
at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:154)
The issue persisted until the function pods were manually restarted, by which time their own customers had already been impacted.
• ClientCnx.handleLookupResponse --> ClientCnx represents the client's connection to a broker. It was processing the response to a CommandLookupTopic request. A topic lookup is the mechanism a client uses to ask the Pulsar cluster, "Which broker is currently serving this topic?".
• CommandLookupTopicResponse.getMessage --> The client received a response from the broker. The code then attempted to get an error message from this response.
• IllegalStateException: Field 'message' is not set --> This exception means the client code expected the message field to be populated in the response it received, but it wasn't. In the Pulsar protocol, this message field is optional. The broker sent a response indicating failure or redirection but didn't include the optional descriptive text.
Our Hypothesis:
Our theory is that the function's internal client entered a "stuck" state during the broker rolling upgrade and failed to recover.
What happened during the update:
1. A broker pod is terminated
2. All client connections to that broker are severed.
3. The Pulsar Function's internal client automatically tries to reconnect.
4. As part of the reconnection, it performs a topic lookup.
5. The ownership of the topic may be in the process of being transferred from the old broker to a new one. If the client's lookup request hits a broker during this transient state, the broker might issue a Redirect or Failed lookup response.
6. In some edge cases, this response is sent without the optional message field, triggering the bug in the
We also discovered a version mismatch between the cluster and the client libraries:
• *Client Libraries:*pulsar-client and pulsar-functions-api are both version 3.2.1.
• Cluster Version:3.0.11.
My question are now:
Is it a requirement for the client library versions to strictly match the Pulsar cluster version? Could this version skew be a potential cause for this kind of failure during a rolling upgrade?
Are you aware of any existing GitHub issues related to this specific IllegalStateException in this context? We were unable to find a direct match in our own research.
Thanks in advance!