https://pulsar.apache.org/ logo
Join Slack
Powered by
# ai-crosspost
  • r

    RunLLM

    05/29/2025, 10:06 PM
    New RunLLM response generated: >>>*# Question* After upgraded to 3.0.11, i saw lot of logs like
    Copy code
    org.apache.zookeeper.server.quorum.Learner - Revalidating client:0x3015fd17ce80066
    and also on broker side some logs like it first closed the topic
    Copy code
    2025-05-29T21:22:26,341+0000 [bookkeeper-ml-scheduler-OrderedScheduler-0-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [<persistent://6614/Salesforce/New> Contact] Topic closed
    then it will try to create the topic almost immediately but it may fail for once then retry creating then succeed. any idea why it is doing this ? # Answer The log
    Learner - Revalidating client
    is a normal ZooKeeper message indicating session revalidation, often seen during client reconnects or session renewals. The broker log showing a topic being closed and then immediately recreated, sometimes failing once before succeeding, is typically due to transient ZooKeeper or BookKeeper metadata consistency issues or short-lived network hiccups during topic ownership changes or broker restarts. This pattern is not uncommon after upgrades or in clusters with high metadata activity, and Pulsar is designed to retry such operations automatically until successful, as seen in your logs. If the topic creation fails once and then succeeds on retry, it usually means the broker was temporarily unable to access the required metadata (possibly due to ZooKeeper session revalidation or a brief lag in metadata propagation). Unless you see persistent failures or data loss, these logs are generally informational and not a cause for concern. For more, see related Pulsar broker and ZooKeeper session handling discussions and metadata consistency notes. Would you like more detail on how Pulsar handles topic metadata and retries after such events? Sources: - Pulsar broker stuck on zkSession recreate loop - Metadata consistency issues from user’s point of view Slack Link https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1748556363.841139 πŸ‘ 0 button πŸ‘Ž 0 button πŸ’‘ Hint: Mention @runllm in the thread for followups.
  • r

    RunLLM

    05/30/2025, 9:45 AM
    New RunLLM response generated: >>>*# Question* Hello everyone, I need help understanding the behavior of shared subscriptions in a geo-replicated setup. We have several topics in our Pulsar deployment. For some topics (cache-related), we have geo-replication disabled. Others work as expected β€” the subscription cursor is replicated to the backup cluster. However, we are seeing inconsistent behavior with topics used for delayed messages and shared subscriptions. These topics have geo-replication enabled and use individual acknowledgments. According to the documentation, individual acknowledgments themselves are not replicated across clusters. However, the markDeletePosition should be replicated. In our tests, we noticed that the markDeletePosition in the backup cluster does not move predictably. In some cases, it remains unchanged for a long time. The only time it eventually advances is after the primary cluster stops receiving new messages to that topic β€” and then, after a delay, the markDeletePosition is finally updated in the backup cluster. Why does this happen, and is there a way to make markDeletePosition replication more predictable for topics with delayed messages and shared subscriptions? Pulsar version: 4.0.4 Thank you in advance! # Answer This behavior is expected: in Pulsar geo-replication, only the baseline cursor position (markDeletePosition) is replicated, not individual acknowledgments. For shared subscriptions and topics with delayed messages, markDeletePosition only advances when all messages up to a certain point are acknowledged (i.e., no "holes" from out-of-order or individual acks). If there are unacknowledged or individually-acked messages, markDeletePosition will not move, and thus the replicated state lags until the backlog is cleared or no new messages arrive, at which point the markDeletePosition can advance and be replicated. This is a documented limitationβ€”see Pulsar geo-replication docs and PIP-33 limitations. There is no built-in way to make markDeletePosition replication more granular or predictable for shared subscriptions with individual acks. The only workaround is to ensure consumers acknowledge messages in order, or to periodically drain the backlog so markDeletePosition can advance. This limitation is inherent to how Pulsar replicates only the baseline cursor, not the full set of individually-acked positions, which can result in delayed or unpredictable replication of the subscription state in these scenarios. Would you like more detail on the technical reasons or possible mitigations? Sources: - Pulsar geo-replication docs Slack Link https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1748598297.819549 πŸ‘ 0 button πŸ‘Ž 0 button πŸ’‘ Hint: Mention @runllm in the thread for followups.
  • r

    RunLLM

    05/30/2025, 8:16 PM
    New RunLLM response generated: >>>*# Question* Hello, question, my pulsar cluster added this configuration: allowAutoTopicCreation: "false" The idea is to prevent dynamic creations by the apps and manage all with pulsar-admin, but I'd like to know if it also affects the creation of the deadletter topics. # Answer Yes, setting
    allowAutoTopicCreation: "false"
    prevents automatic creation of all topics, including dead letter topics (DLQ). If a consumer tries to use a dead letter policy and the DLQ topic does not exist, Pulsar will not auto-create it, and the operation will fail unless the topic is manually created first using
    pulsar-admin
    or another admin tool. This is confirmed in the Pulsar documentation: "If this parameter is set but the broker's allowAutoSubscriptionCreation is disabled, the DLQ producer will fail to be created." Would you like more detail on how to manually create DLQ topics or handle this scenario? Sources: - Pulsar documentation Slack Link https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1748636138.806349 πŸ‘ 0 button πŸ‘Ž 0 button πŸ’‘ Hint: Mention @runllm in the thread for followups.
  • r

    RunLLM

    06/03/2025, 4:39 PM
    New RunLLM response generated: >>>*# Question* anybody knows this warn message
    Copy code
    failed to get Partitioned metadata : {\"errorMsg\":\"org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /managed-ledgers/15548/Orchestration/persistent/closed_periods\",\"reqId\":2511449849216011802, \"remote\":\"pulsar-broker.pulsar.svc.cluster.local.analytics.svc.cluster.local/10.1.177.78:6650\", \"local\":\"/10.11.68.215:34530\"}","logger":"o.a.p.c.i.BinaryProtoLookupService","thread":"pulsar-client-io-5-1","level":"WARN","stacktrace":"<#f1bccd48> o.a.p.c.a.PulsarClientException$BrokerMetadataException: {\"errorMsg\":\"org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /managed-ledgers/15548/Orchestration/persistent/closed_periods\",\"reqId\":2511449849216011802, \"remote\":\"pulsar-broker.pulsar.svc.cluster.local.analytics.svc.cluster.local/10.1.177.78:6650\", \"local\":\"/10.11.68.215:34530\"}\n\tat o.a.p.c.i.ClientCnx.getPulsarClientException(ClientCnx.java:1352)\n\tat o.a.p.c.i.ClientCnx.handlePartitionResponse(ClientCnx.java:691)\n\tat o.a.p.c.p.PulsarDecoder.channelRead(PulsarDecoder.java:144)\n\tat o.a.p.s.i.n.c.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\tat o.a.p.s.i.n.c.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat o.a.p.s.i.n.c.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat o.a.p.s.i.n.h.c.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)\n\tat o.a.p.s.i.n.h.c.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)\n\tat o.a.p.s.i.n.c.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\tat o.a.p.s.i.n.c.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat o.a.p.s.i.n.c.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat o.a.p.s.i.n.h.f.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152)\n\tat o.a.p.s.i.n.c.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)\n\tat o.a.p.s.i.n.c.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat o.a.p.s.i.n.c.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat o.a.p.s.i.n.c.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357)\n\tat o.a.p.s.i.n.c.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)\n\tat o.a.p.s.i.n.c.Abst...\n"}
    • 1
    • 1
  • r

    RunLLM

    06/06/2025, 7:10 AM
    New RunLLM response generated: >>>*# Question* Is there anyway we can track the subscription creation time ? # Answer It seems likely that Pulsar does not expose a direct metric or field for subscription creation time in its standard stats or metrics APIs. The available subscription metrics focus on message backlog, rates, and timestamps related to message consumption and acknowledgment, but do not include a creation timestamp for subscriptions, as described in the official metrics documentation. Would you like more detail on possible workarounds or indirect methods to infer subscription creation time? Sources: - official metrics documentation Slack Link https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1749193806.583939 πŸ‘ 0 button πŸ‘Ž 0 button πŸ’‘ Hint: Mention @runllm in the thread for followups.
  • r

    RunLLM

    06/07/2025, 12:11 AM
    New RunLLM response generated: >>>*# Question* Hello! question, I can't find in the pulsar documentation 4.0 the page of Pulsar SQL, is this feature available? # Answer Pulsar SQL documentation is not present in the official Apache Pulsar 4.0 docs, and the feature is not directly referenced in the 4.0 release notes or download page. There are also recent user reports of 404 errors and missing documentation for certain features, including Pulsar SQL, in the latest docs (see here). Would you like more detail on alternatives for SQL-like querying in Pulsar or information about SQL support in managed Pulsar services like StreamNative Cloud? Sources: - see here Slack Link https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1749255084.540879 πŸ‘ 0 button πŸ‘Ž 0 button πŸ’‘ Hint: Mention @runllm in the thread for followups.
  • r

    RunLLM

    06/09/2025, 4:59 PM
    New RunLLM response generated: >>>*# Question* Hi everyone*,* I’m working on a real-time pub-sub system where data durability is not a requirement β€” I’m okay with potential data loss, and no message retention is needed. While exploring Apache Pulsar, I came across non-persistent topics, which seem ideal since they skip writing to BookKeeper. However, even with non-persistent topics, it appears that BookKeeper is still a required component of the Pulsar stack. For a setup where durability isn't needed at all, this seems like unnecessary overhead. Is there any way to eliminate the BookKeeper layer entirely in such a use case β€” perhaps running only the Pulsar broker (or a lighter version). I’d love to hear thoughts on the ideal architecture for this type of real-time, fire-and-forget messaging requirement. Thanks in advance! # Answer BookKeeper is still a required component in standard Apache Pulsar deployments, even if you use only non-persistent topics. Pulsar brokers depend on BookKeeper for internal operations and metadata, and there is no officially supported way to run Pulsar brokers without BookKeeper, even when using only non-persistent topics. Non-persistent topics do skip writing messages to BookKeeper, but the service itself must still be present in the stack (Pulsar docs, Pulsar architecture). If you want a truly BookKeeper-free, fire-and-forget pub-sub system, Pulsar does not currently support a "broker-only" mode. You must deploy BookKeeper, even if it is not used for message storage. Would you like more detail on possible workarounds or alternative architectures? Sources: - Non-persistent messaging | Apache Pulsar - Pulsar architecture Slack Link https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1749488327.754299 πŸ‘ 0 button πŸ‘Ž 0 button πŸ’‘ Hint: Mention @runllm in the thread for followups.
  • r

    RunLLM

    06/10/2025, 4:56 PM
    New RunLLM response generated: >>>*# Question* Whops, just wanted to take a look at the slack integrations, didn’t mean to add anything πŸ˜‚ # Answer I'm here to help with questions about StreamNative, Ursa, or Apache Pulsar! If you have any questions about Slack integrations with these platforms, or need information on how to set up notifications, connectors, or integrations, please let me know. If you meant something else, feel free to provide more details or rephrase your question! Slack Link https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1749574565.969889 πŸ‘ 0 button πŸ‘Ž 0 button πŸ’‘ Hint: Mention @runllm in the thread for followups.
  • r

    RunLLM

    06/11/2025, 4:08 PM
    New RunLLM response generated: >>>*# Question* Hi ! Using pusar-rs, we can see on a subscription a negative 'availablePermits' (-127), is it expected to have something < 0 ? # Answer No, it is not expected for availablePermits to be negative in Pulsar or pulsar-rs. Negative availablePermits usually indicate a bug or a flow control issue, and can cause consumers to stop receiving messages. This has been observed in other Pulsar clients as well, such as Go and Java, and is not normal behavior. See discussion in pulsar-rs issue #114 and pulsar-client-go issue #839. Would you like more detail on possible causes or troubleshooting steps? Sources: - pulsar-client-go issue #839 - pulsar-rs issue #114 Slack Link https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1749658052.996799 πŸ‘ 0 button πŸ‘Ž 0 button πŸ’‘ Hint: Mention @runllm in the thread for followups.
  • r

    RunLLM

    06/14/2025, 8:48 AM
    New RunLLM response generated: >>>*# Question* Hello, I can see there is activity working in this branch in flink pulsar connector: https://github.com/streamnative/flink-connector-pulsar/tree/v4.3-sn Will those changes be merged to the official filnk pulsar connector repository? The main repo looks like abandoned because there is no new releases since Dec 2023, and there are new PR merged in main that still weren't published 😞 # Answer It seems likely that the StreamNative-maintained branch (v4.3-sn) for the Flink Pulsar connector is being actively developed, while the official Apache Flink Pulsar connector repository has not published new releases since December 2023, despite some PRs being merged. There is no explicit statement in the available sources confirming if or when the StreamNative changes will be merged upstream, but the StreamNative fork is intended to track and extend the official connector, and historically, there has been collaboration and backporting between forks and the main repo. My understanding is that users needing the latest features or fixes may need to use the StreamNative branch for now, as the main repo's release cadence is slower and may lag behind in publishing new versions. Would you like more detail on the relationship between the StreamNative fork and the official repo, or on how to use the StreamNative version in your environment? Sources: - https://github.com/apache/flink-connector-pulsar/pull/94 - https://github.com/apache/flink-connector-pulsar/pull/44 Note: I wasn’t able to find highly relevant data sources, but above are a few potentially relevant links. Slack Link https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1749890849.006829 πŸ‘ 0 button πŸ‘Ž 0 button πŸ’‘ Hint: Mention @runllm in the thread for followups.
  • r

    RunLLM

    06/17/2025, 10:00 AM
    New RunLLM response generated: >>>*# Question* Hi everyone, wanted to know does non persistent partitioning of a topic supports broadcasting to all consumers for that particular topic? # Answer Yes, non-persistent partitioned topics in Pulsar support all standard subscription types, including exclusive, shared, key_shared, and failover. For broadcasting (where all consumers receive all messages), you should use the exclusive or failover subscription types with a single consumer, or have each consumer use a unique subscription name. However, with the shared or key_shared subscription types, messages are distributed among consumers, not broadcasted to all. This behavior is the same for both persistent and non-persistent topics, including partitioned ones, as per Pulsar documentation and StreamNative docs. Would you like more detail on how to configure this or on the subscription types? Sources: - StreamNative docs - Pulsar documentation Slack Link https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1750154364.949799 πŸ‘ 0 button πŸ‘Ž 0 button πŸ’‘ Hint: Mention @runllm in the thread for followups.
  • r

    RunLLM

    06/19/2025, 9:16 AM
    New RunLLM response generated: >>>*# Question* Hello Team I have query on Node JS pulsar-client module here I am trying to serialize and deserialize messageId for my business use case but issue is for proper error handling if in case we pass invalid buffer data to MessageId.deserialize method then it throws C++ exception which is not handled by pulsar-client and node JS crashes did anyone else face this issue? I see similar issue was there while creating a client too but that got fixed with this issue and PR https://github.com/apache/pulsar-client-node/pull/307/files where it was wrapped with try catch block but deserialize is still not fixed any C++ experts can help to contribute with the fix or a solution would be appreciated # Answer You are correct: if you pass invalid buffer data to
    MessageId.deserialize
    in the Pulsar Node.js client, it can throw a C++ exception that is not caught, causing the Node.js process to crash. This is similar to the issue previously fixed for client creation, where C++ exceptions are now wrapped and handled in Node.js, as shown in PR #307. However, this exception handling has not yet been implemented for
    MessageId.deserialize
    , and the problem remains open for that method. No official fix for this specific issue in
    MessageId.deserialize
    is present in the data sources, and there is no PR addressing it yet. You or another contributor could propose a fix by wrapping the C++ code in a try-catch block and converting exceptions to JavaScript errors, similar to the approach in PR #307. Would you like more detail on how to implement this in the C++ binding? Sources: - PR #307 Slack Link https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1750324556.646719 πŸ‘ 0 button πŸ‘Ž 0 button πŸ’‘ Hint: Mention @runllm in the thread for followups.
  • r

    RunLLM

    06/20/2025, 11:44 AM
    New RunLLM response generated: >>>*# Question* hello everyone I need a tool for monitoring apache pulsar 4.0.0 version but I got a customAuthentication for my architecture. Can some one help me ? # Answer It seems likely that for monitoring Apache Pulsar 4.0.0, you can use Prometheus and Grafana, which are standard tools for Pulsar observability. Grafana dashboards can visualize metrics from brokers, bookies, ZooKeeper, and more, while Prometheus scrapes metrics from Pulsar's /metrics endpoints. These tools are compatible with recent Pulsar versions and are widely used in production environments according to Pulsar monitoring docs, StreamNative monitoring guide, and

    Grafana dashboard usageβ–Ύ

    . Regarding custom authentication, my understanding is that Pulsar supports pluggable authentication mechanisms, but if you have a custom authentication plugin, you may need to ensure your monitoring tools (Prometheus, Grafana, or any custom exporter) can authenticate using your custom method. There is no explicit documentation for integrating custom authentication with monitoring tools, so you may need to adapt your exporter or proxy to support your authentication flow as described in Pulsar authentication concepts. Would you like more detail on configuring Prometheus/Grafana or handling custom authentication in this context? Sources: - Figure - Pulsar monitoring docs - StreamNative monitoring guide - Pulsar authentication concepts Slack Link https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1750419859.918219 Image πŸ‘ 0 button πŸ‘Ž 0 button πŸ’‘ Hint: Mention @runllm in the thread for followups.
  • r

    RunLLM

    06/23/2025, 12:20 PM
    New RunLLM response generated: >>>*# Question* I'm fairly new to pulsar and really just evaluating it as a replacement for kafka. I have some questions about data offloading to s3. How does this effect replication with in pulsar? is it really necessary to do both? if we were to lower the offloading time / size to be fairly low could we lower replication factor? One of out biggest costs with kafka right now is data transfer across availability zones in AWS. either from broker <> broker or broker <> clients in different zones. if our data lives on s3 the need for large disk is reduce and may reduce the need for replication rather significantly if I'm understanding offloading correctly. we'd basically only need enough disk to account for data that hasn't been moved yet. Is this accurate? is it an appropriate use of offloading? # Answer Offloading to S3 in Pulsar moves older, sealed data segments from BookKeeper to S3 for cost-effective, long-term storage, but it does not replace Pulsar's replication. Replication ensures data durability and availability within the cluster, while offloading is for reducing local storage costs and enabling long backlogs; both serve different purposes and are typically used together. Lowering the offload threshold reduces local disk usage, but you should not lower the replication factor solely because data is offloadedβ€”replication is still needed for fault tolerance and high availability before data is offloaded and for active data segments (Pulsar Tiered Storage Overview, S3 Offloader Docs). Your understanding is correct that offloading reduces the need for large disks, as only recent/unoffloaded data must be stored locally. However, offloading does not reduce the need for replication within the cluster, since S3 is not a substitute for Pulsar's internal redundancy and does not provide the same low-latency, high-availability guarantees for active data (Pulsar Tiered Storage Cookbook, S3 Offloader Docs). Would you like more detail on how offloading and replication interact, or on optimizing Pulsar for AWS network costs? Sources: - Pulsar Tiered Storage Overview - S3 Offloader Docs - Pulsar Tiered Storage Cookbook Slack Link https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1750681180.742109 πŸ‘ 0 button πŸ‘Ž 0 button πŸ’‘ Hint: Mention @runllm in the thread for followups.
  • r

    RunLLM

    06/23/2025, 12:29 PM
    New RunLLM response generated: >>>*# Question* If we set our offloading threshold to something like 10 minutes. Is that silly? # Answer Setting the offload threshold to 10 minutes is not inherently silly; it is supported and configurable in Pulsar. The offload threshold can be set by time (seconds, minutes, hours, etc.) or by size, and a value like 10 minutes means data will be offloaded from BookKeeper to long-term storage once it has been stored for 10 minutes. This is valid and can be useful for aggressive offloading if you want to minimize BookKeeper storage usage, but it may increase the frequency of offload operations and associated costs or latency, depending on your workload and storage backend. According to the documentation, you can set this with
    --time 10m
    using the
    pulsar-admin namespaces set-offload-threshold
    command, and the system will trigger offloading when the time threshold is reached, as long as the current ledger segment is full and rolled over. Negative values disable offloading, and 0 triggers immediate offload as soon as possible (Pulsar docs, Pulsar admin reference). Would you like more detail on the trade-offs or operational considerations for such a short offload interval? Sources: - Pulsar docs - Pulsar admin reference Slack Link https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1750681713.939359 πŸ‘ 0 button πŸ‘Ž 0 button πŸ’‘ Hint: Mention @runllm in the thread for followups.
  • r

    RunLLM

    06/24/2025, 7:26 AM
    New RunLLM response generated: >>>*# Question* Hey, our clients most recently faced a quite interesting issue which we are currently investigating and would like some feedback from your guys. To give you a short detail we run and operator a pulsar cluster in a high availability setup. The current version is 3.0.11. We did an PROD upgrade where all of the components got replaced one by one, so no downtime was expected. In fact on of our customers who is running multiple pulsar functions had on two functions the following issue.
    java.lang.IllegalStateException: Field 'message' is not set
    at org.apache.pulsar.common.api.proto.CommandLookupTopicResponse.getMessage(CommandLookupTopicResponse.java:220)
    at org.apache.pulsar.client.impl.ClientCnx.handleLookupResponse(ClientCnx.java:629)
    at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:154)
    After restarting the pulsar function itself it was working again but after the client noticed that it is not working, already customers complained. From my understanding this is what happens. β€’ The function's client got into a "stuck" state because of the unhandled exception. It was not retrying properly. # Answer Your understanding is correct: the Pulsar Function's client encountered an unhandled exception (
    IllegalStateException: Field 'message' is not set
    ) during a lookup response, which caused the function to get "stuck" and stop retrying or reconnecting as expected. Restarting the function resolved the issue, but only after service impact was noticed. This is consistent with known issues where Pulsar client-side exceptions during protocol handling (like lookup failures) can leave the client in a bad state, especially if the error is not gracefully handled or retried by the function runtime. This is not unique to your environment and has been observed in other Pulsar deployments as well. For further mitigation, ensure you are running the latest supported Pulsar version, as some related bugs have been fixed in recent releases. Also, consider implementing external monitoring and automated restarts for functions that enter a failed state, since the Pulsar Functions framework may not always recover from such protocol-level exceptions automatically. For more details, see the discussion in Pulsar Functions error handling and the relevant ClientCnx.java code. Would you like a step-by-step breakdown of the error flow and possible improvements to function resiliency? Sources: - Pulsar Functions error handling - ClientCnx.java code Slack Link https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1750749913.510049 πŸ‘ 0 button πŸ‘Ž 0 button πŸ’‘ Hint: Mention @runllm in the thread for followups.
  • r

    RunLLM

    06/24/2025, 7:47 AM
    New RunLLM response generated: >>>*# Question* Hey, We're hoping you can provide some insight into an issue one of our clients experienced following a recent rolling upgrade of our Pulsar cluster. Context: β€’ Cluster Version: Pulsar 3.0.11 (High-Availability Setup) β€’ Action: We performed a rolling upgrade of all components in production. β€’ Client: One of our customers runs multiple Pulsar Functions. The Issue: Immediately after the upgrade, two of the customer's functions became unresponsive and started logging the following exception:
    java.lang.IllegalStateException: Field 'message' is not set
    at org.apache.pulsar.common.api.proto.CommandLookupTopicResponse.getMessage(CommandLookupTopicResponse.java:220)
    at org.apache.pulsar.client.impl.ClientCnx.handleLookupResponse(ClientCnx.java:629)
    at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:154)
    The issue persisted until the function pods were manually restarted, by which time their own customers had already been impacted. Our Hypothesis: Our theory is that the function's internal client entered a "stuck" state during the broker rolling upgrade and failed to recover. We also discovered a version mismatch between the cluster and the client libraries: β€’ *Client Libraries:*pulsar-client and pulsar-functions-api are both version 3.2.1. β€’ Cluster Version:3.0.11. My question are now: Is it a requirement for the client library versions to strictly match the Pulsar cluster version? Could this version skew be a potential cause for this kind of failure during a rolling upgrade? Are you aware of any existing GitHub issues related to this specific IllegalStateException in this context? We were unable to find a direct match in our own research. Thanks in advance! # Answer It seems likely that a version mismatch between Pulsar client libraries (3.2.1) and the broker (3.0.11) could contribute to issues like the IllegalStateException you observed, especially during rolling upgrades. While Pulsar is designed for client-broker compatibility across versions, new client features or protocol changes may not be fully supported by older brokers, potentially leading to unexpected errors or "stuck" states in clients or Pulsar Functions. This is supported by the official documentation, which notes that new features relying on protocol updates require both newer clients and brokers, and that strict version matching is not required but recommended for stability and feature parity (Pulsar client libraries).
    • 1
    • 1
  • r

    RunLLM

    06/28/2025, 5:21 AM
    New RunLLM response generated: >>>*# Question* https://www.linkedin.com/blog/engineering/infrastructure/introducing-northguard-and-xinfra # Answer I'm only able to answer questions related to StreamNative, Ursa, or Apache Pulsar. If you have a question about these technologies or need information about data streaming, please provide more details or rephrase your question. Let me know how I can assist you with StreamNative or related topics! Slack Link https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1751088100.888119 πŸ‘ 0 button πŸ‘Ž 0 button πŸ’‘ Hint: Mention @runllm in the thread for followups.
  • r

    RunLLM

    06/30/2025, 9:36 AM
    New RunLLM response generated: >>>*# Question* I am trying to setup a pulsar cluster with only ETCD as the metadata store. But now I am getting stuck with using the ETCD as the metadata store for the bookies, it seems like it cannot recognize ETCD. can someone help me about the setting? # Answer To use etcd as the metadata store for both Pulsar and BookKeeper, you must set the following in your
    broker.conf
    and `bookkeeper.conf`: - For Pulsar:
    Copy code
    metadataStoreUrl=etcd:http://<etcd1>:2379,http://<etcd2>:2379,http://<etcd3>:2379
      configurationMetadataStoreUrl=etcd:http://<etcd1>:2379,http://<etcd2>:2379,http://<etcd3>:2379
    - For BookKeeper:
    Copy code
    metadataServiceUri=etcd://<etcd1>:2379,<etcd2>:2379,<etcd3>:2379
    Note: etcd support in BookKeeper is not fully completeβ€”features like under-replication management are missing, and there are known issues with stability and compatibility in some Pulsar/BookKeeper versions. Some users have reported connection and session issues when using etcd, especially in Kubernetes environments. If you encounter errors, check logs for etcd session or lease errors and consider falling back to ZooKeeper if critical features are needed. See Configure metadata store | Apache Pulsar and Incomplete implementation for etcd driver for metadata. Would you like more detail on troubleshooting specific errors or version compatibility? Sources: - Configure metadata store | Apache Pulsar - Incomplete implementation for etcd driver for metadata - ETCD doesn't work as a metadata store Slack Link https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1751276182.014709 πŸ‘ 0 button πŸ‘Ž 0 button πŸ’‘ Hint: Mention @runllm in the thread for followups.
  • r

    RunLLM

    07/01/2025, 11:43 AM
    New RunLLM response generated: >>>*# Question* Hello, I run Pulsar in cluster mode (3.0.8), with 3 zookeepers / 3 bookkeepers / 5 brokers / 5 proxy. I had a server restart and it was hosting : 1 broker, 1 proxy and 1 zookeeper (The broker and the zookeeper instances were the leaders). Even when my server came back up, almost all the topics balanced themselves on only two of the brokers (even brokers that stayed up lost their topics) as you can see on the attached capture : Now, the existing (and newly created) topics are only managed by only 2 brokers (out of 5) and not balanced between all the others. Can you help me to find a way to prevent this behaviour please ? How can I achieve effective load balancing when a situation like this occurs. Is it possible to force the balancing of topics between brokers without restarting them ? For availability and Production reasons, I want to avoid downtime as most as possible. # Answer When a Pulsar broker or zookeeper leader restarts, topic bundles may concentrate on a subset of brokers, especially if the load balancer does not redistribute them automatically. This can result in most topics being managed by only a few brokers, as seen in your graph. To rebalance topics without restarting brokers, you can manually unload bundles using the Pulsar admin CLI, which will force the load balancer to redistribute them across available brokers. For example:
    Copy code
    bin/pulsar-admin namespaces unload <tenant>/<namespace>
    This command will unload all bundles in the specified namespace, prompting Pulsar to reassign them to brokers based on current load. For more granular control, you can unload specific bundles. This approach avoids downtime and helps restore balanced distribution. For more details, see the Pulsar load balancing quick start guide. Would you like more detail on configuring automatic load balancing or troubleshooting persistent imbalance? Sources: - Pulsar load balancing quick start guide Slack Link https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1751370199.017679 πŸ‘ 0 button πŸ‘Ž 0 button πŸ’‘ Hint: Mention @runllm in the thread for followups.
  • r

    RunLLM

    07/02/2025, 6:03 AM
    New RunLLM response generated: >>>*# Question* https://s2.dev/blog/agent-sessions # Answer The provided link discusses the concept of "stream per agent session," but there is no detailed technical information or API reference about agent sessions in StreamNative, Ursa, or Apache Pulsar in the available data sources. If you are looking for how sessions are managed in Pulsar or related systems, please clarify your use case or what specific information you need. Would you like more detail or a comparison with Pulsar's session/event handling? Sources: - https://streamnative.slack.com/archives/C071KAATVLG/p1718692176.301869 Slack Link https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1751436215.801089 πŸ‘ 0 button πŸ‘Ž 0 button πŸ’‘ Hint: Mention @runllm in the thread for followups.
  • r

    RunLLM

    07/02/2025, 6:48 PM
    New RunLLM response generated: >>>*# Question* Hello! I am doing some geo-replication testing and I have followed the documentation to connect and test my two pulsar clusters. I have gotten it to work, but I have to manually create my topics on the cluster for it to work, and the topic is not being replicated to the other cluster, so I have to manually do a topics create on both clusters. Per documentation I should be able to configure it: β€’ Once you create a geo-replication namespace, any topics that producers or consumers create within that namespace are replicated across clusters. Typically, each application uses the
    serviceUrl
    for the local cluster. I enabled geo-replication at the namespace level, not at the topic level. Does anyone know how to solve this issue? Using v4.0.1 # Answer If you have enabled geo-replication at the namespace level, topics created in that namespace should be automatically replicated across clusters, and you should not need to manually create the topic on each cluster. If this is not happening, it is likely due to a configuration issue or a mismatch in how the clusters are set up. Key points to check: 1. Namespace Replication Clusters: Ensure you have set the replication clusters for the namespace using:
    Copy code
    bin/pulsar-admin namespaces set-clusters <tenant>/<namespace> --clusters <cluster1>,<cluster2>
    2. Tenant Allowed Clusters: The tenant must have both clusters listed in its allowed clusters:
    Copy code
    bin/pulsar-admin tenants create <tenant> --allowed-clusters <cluster1>,<cluster2>
    3. Topic Creation: When you create a topic in the geo-replicated namespace, Pulsar should automatically create the topic in the remote cluster. If not, check that both clusters can communicate and that the configuration stores are set up correctly. 4. Auto Topic Creation: Ensure that
    allowAutoTopicCreation
    is enabled (default is true). If disabled, topics will not be auto-created on the remote cluster. For more details, see Pulsar geo-replication documentation and StreamNative geo-replication guide. Would you like a step-by-step troubleshooting checklist or more detail on a specific configuration? Sources: - Pulsar geo-replication documentation - StreamNative geo-replication guide Slack Link https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1751482089.785439 πŸ‘ 0 button πŸ‘Ž 0 button πŸ’‘ Hint: Mention @runllm in the thread for followups.
  • r

    RunLLM

    07/03/2025, 7:40 AM
    New RunLLM response generated: >>>*# Question* Hey, I have a question about using Pulsar as a message queue for our use case. Our scenario: β€’ We use a topic as a queue that receives bursts of messages (small amounts) once or twice a day β€’ Processing time is very long (15-80 minutes per message) and resource-intensive β€’ We monitor the topic backlog and auto-scale consumers accordingly The problem: We can't get messages to distribute evenly across consumers. Ideally, we want 1 message per consumer, but what happens is the first consumer that gets up claims ALL messages from the backlog. When we scale up and add more consumers, they sit idle because all messages are already assigned to the first consumer. This creates a situation where 1 consumer processes for hours while 9 others do nothing, even though we have 10 messages and 10 consumers. I've opened a GitHub discussion with more details: https://github.com/apache/pulsar/discussions/24477 Question: Is there a broker-side configuration to limit how many messages each consumer can claim? Or is there a different pattern we should use for long-processing tasks with dynamic scaling? Any insights would be really appreciated! # Answer Pulsar's default behavior with Shared subscriptions is to dispatch all backlog messages to the first available consumer, so when you scale up, new consumers remain idle until the backlog is cleared or new messages arrive. There is no broker-side configuration to directly limit the number of messages assigned per consumer in this scenario. The closest controls are
    receiver_queue_size
    and
    max_total_receiver_queue_size_across_partitions
    , but these only limit how many messages are fetched into each consumer's queue, not how the backlog is distributed when new consumers join. This is a known challenge for bursty, long-running workloads with dynamic scaling, as described in your linked discussion and confirmed in the Pulsar documentation and community feedback (Pulsar as a message queue, GitHub discussion). A common workaround is to pre-scale your consumers before the burst arrives, so all are present when messages are published, or to implement application-level logic to coordinate message claiming and processing. Alternatively, you can experiment with setting a very low
    receiver_queue_size
    (e.g., 1) for each consumer, but this does not guarantee strict one-message-per-consumer distribution for existing backlog. There is no built-in broker-side "max in-flight per consumer" setting for Shared subscriptions. Would you like more detail on possible architectural patterns or code-level workarounds for this use case? Sources: - Pulsar as a message queue
    • 1
    • 1
  • r

    RunLLM

    07/04/2025, 8:34 AM
    New RunLLM response generated: >>>*# Question* Does anyone deployed Pulsar on Kubernetes using local PVC on production? Our current cloud provider has relatively slow Ceph-based attached volumes, and fast local NVMe SSD. We already to have replication on bookkeeper, so probably we don't need to add another layer like Longhorn. I found that there is such an option in Helm, but the question is - is it supported? # Answer Yes, deploying Pulsar on Kubernetes using local Persistent Volumes (PVs) is supported and documented. The Pulsar Helm chart and StreamNative Platform both support using local storage provisioners for BookKeeper and ZooKeeper, allowing you to leverage fast local NVMe SSDs. This is a common approach when cloud-attached storage is slow, and BookKeeper's replication provides data durability, so an additional abstraction like Longhorn is not required. However, local PVs are best suited for environments where node failure and pod rescheduling are carefully managed, as data is tied to the node's local disk. For production, ensure your operational model can tolerate node loss and that you have sufficient replication configured for BookKeeper to avoid data loss if a node fails. For configuration details, see the official documentation on configuring local PVs and storage classes and using the local_storage option in Helm. Would you like more detail on best practices or example Helm values? Sources: - Configuring local PVs and storage classes - Helm local_storage option Slack Link https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1751618061.738539 πŸ‘ 0 button πŸ‘Ž 0 button πŸ’‘ Hint: Mention @runllm in the thread for followups.
  • r

    RunLLM

    07/07/2025, 1:43 PM
    New RunLLM response generated: >>>*# Question* Hi All, In one of our deployment we have 3 partitioned topics and we have java producer code producing data to that topics . When i check the producer stats its showing like below where for same number of messages the latency is high and because of which i think the pending messages are also increasing . 2025-07-04 124036.832 [pulsar-timer-6-1] INFO org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - [persistent://public/default/-partition-1] [-175-224582] Pending messages: 15 --- Publish throughput: 66.55 msg/s --- 0.49 Mbit/s --- Latency: med: *3000.*000 ms - 95pct: 4741.000 ms - 99pct: 5997.000 ms - 99.9pct: 6000.000 ms - max: 6404.000 ms --- BatchSize: med: 2.000 - 95pct: 65.000 - 99pct: 68.000 - 99.9pct: 69.000 - max: 69.000 --- MsgSize: med: 1930.000 bytes - 95pct: 64713.000 bytes - 99pct: 65368.000 bytes - 99.9pct: 65465.000 bytes - max: 65465.000 bytes --- Ack received rate: 69.33 ack/s --- Failed messages: 0 --- Pending messages: 15 [persistent://public/default/-partition-0] [-175-224582] Pending messages: 88 --- Publish throughput: 103.66 msg/s --- 0.76 Mbit/s --- Latency: med: 2735.000 ms - 95pct: 5846.000 ms - 99pct: 5998.000 ms - 99.9pct: 6753.000 ms - max: 6753.000 ms --- BatchSize: med: 3.000 - 95pct: 59.000 - 99pct: 69.000 - 99.9pct: 69.000 - max: 69.000 --- MsgSize: med: 3066.000 bytes - 95pct: 57746.000 bytes - 99pct: 65327.000 bytes - 99.9pct: 65510.000 bytes - max: 65510.000 bytes --- Ack received rate: 110.80 ack/s --- Failed messages: 0 --- Pending messages: 88 2025-07-04 124036.833 [pulsar-timer-6-1] INFO org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - [persistent://public/default/-partition-2] [-175-224582] Pending messages: 0 --- Publish throughput: 105.25 msg/s --- 0.78 Mbit/s --- Latency: med: 4.000 ms - 95pct: 252.000 ms - 99pct: 261.000 ms - 99.9pct: 430.000 ms - max: 961.000 ms --- BatchSize: med: 3.000 - 95pct: 63.000 - 99pct: 69.000 - 99.9pct: 69.000 - max: 69.000 --- MsgSize: med: 2878.000 bytes - 95pct: 59276.000 bytes - 99pct: 65241.000 bytes - 99.9pct: 65469.000 bytes - max: 65528.000 bytes --- Ack received rate: 105.25 ack/s --- Failed messages: 0 --- Pending messages: 0 have 4 broker running but still the topic is not distributed .partition1 and partition 0 are running on same broker . topic unload nor namespace unload is not redistributing topics across brokers .am having threshold shredder and running on 2.10.x pulsar . # Answer
    • 1
    • 1
  • r

    RunLLM

    07/09/2025, 12:30 AM
    New RunLLM response generated: >>>*# Question* Hey, curious if anybody who is familiar with Entry Filters has ideas on why I’m observing higher bookie CPU while the filter is active, despite the fact that the filter runs on the broker: https://github.com/apache/pulsar/discussions/24493 # Answer Entry Filters in Pulsar are executed on the broker, not on the bookie. However, enabling Entry Filters can increase broker-side CPU and memory usage, especially if the filter implementation deserializes message payloads or processes message data inefficiently. This can indirectly increase the load on BookKeeper (bookie) if, for example, the broker cache is less effective or more read requests are made to bookies due to filtered messages not being delivered to consumers, causing more frequent cache misses and additional disk reads by bookies. This is a known performance consideration, as discussed in Entry Filter performance overhead and related issues. If your Entry Filter implementation accesses or deserializes entry data, it can cause more data to be loaded from bookies into broker memory, increasing bookie CPU usage for read operations. It's recommended to avoid deserializing the entire entry in the filter, as noted in the EntryFilter interface documentation. Would you like more detail on how to optimize Entry Filter implementations to minimize this overhead? Sources: - Entry Filter performance overhead - EntryFilter interface documentation Slack Link https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1752020996.938679 πŸ‘ 0 button πŸ‘Ž 0 button πŸ’‘ Hint: Mention @runllm in the thread for followups.
  • r

    RunLLM

    07/09/2025, 1:55 PM
    New RunLLM response generated: >>>*# Question* We want external partners be able to consume topics. These partners are outside our corporate networking boundaries and our policies currently prevent using the Pulsar or Kafka protocol over the public internet. I found Pulsar Beam [1] which seem to do the job (but looks abandoned). Who is using this? Or has a different approach? [1] https://github.com/kafkaesque-io/pulsar-beam Thanks, Jeroen # Answer Pulsar Beam is an HTTP-based streaming and queueing system built on Apache Pulsar, allowing message production via HTTP POST and consumption via webhooks or HTTP SSE, making it suitable for environments where direct Pulsar/Kafka protocol access is restricted. However, the project appears inactive, with limited recent activity and no clear indication of widespread production use or active maintenance, as seen in its GitHub repository. An alternative approach is to deploy a custom HTTP proxy or REST gateway in front of your Pulsar cluster, or use Pulsar's built-in WebSocket support, which is officially maintained and allows clients to interact with Pulsar topics over WebSocket (a protocol often allowed through firewalls). For more robust and supported solutions, consider using Pulsar's WebSocket API or building a lightweight REST proxy tailored to your security and integration needs. Would you like more detail on implementing a WebSocket or REST-based proxy for Pulsar? Sources: - Pulsar Beam GitHub Slack Link https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1752069314.305239 πŸ‘ 0 button πŸ‘Ž 0 button πŸ’‘ Hint: Mention @runllm in the thread for followups.
  • r

    RunLLM

    07/13/2025, 6:28 AM
    New RunLLM response generated: >>>*# Question* Hello everyone. I create a jdbc sink for mysql with this configuration
    Copy code
    tenant: public
    namespace: default
    name: jdbc_sink_pulsar_to_mysql_temp
    archive: connectors/pulsar-io-jdbc-sqlite-4.0.0.nar
    inputs:
      - <persistent://public/default/temp_schema>
    configs:
      jdbcUrl: "jdbc:<mysql://mysql:3306/mqtt_db>"
      userName: "user1"
      password: "1234567890"
      tableName: "pulsar_to_db_temp"
      insertMode: INSERT
      key: "message_id"  
      nonKey: "temperature,timestamp,pulsar_timestamp"
    and i mount the connector under the connectors folder like so
    - ./pulsar-mysql/pulsar-io-jdbc-sqlite-4.0.0.nar:/pulsar/connectors/pulsar-io-jdbc-sqlite-4.0.0.nar
    but i get this error
    Copy code
    ERROR org.apache.pulsar.functions.instance.JavaInstanceRunnable - Sink open produced uncaught exception:
    java.sql.SQLException: No suitable driver found for jdbc:<mysql://mysql:3306/mqtt_db>
            at java.sql.DriverManager.getConnection(Unknown Source) ~[java.sql:?]
            at java.sql.DriverManager.getConnection(Unknown Source) ~[java.sql:?]
            at org.apache.pulsar.io.jdbc.JdbcAbstractSink.open(JdbcAbstractSink.java:97) ~[pulsar-io-jdbc-core-4.0.0.jar:?]
            at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupOutput(JavaInstanceRunnable.java:1080) ~[?:?]
            at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setup(JavaInstanceRunnable.java:263) ~[?:?]
            at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:313) ~[?:?]
            at java.lang.Thread.run(Unknown Source) [?:?]
    Am I using the wrong connector? Or am I missing a configuration # Answer You are using the wrong connector archive and JDBC URL format. The archive
    pulsar-io-jdbc-sqlite-4.0.0.nar
    is for SQLite, not MySQL. For MySQL, you should use the MySQL-specific connector (e.g.,
    pulsar-io-jdbc-mysql-4.0.0.nar
    ), and your JDBC URL should be in the format
    jdbc:mysql://mysql:3306/mqtt_db
    (without the angle brackets and
    <mysql:...>
    ). The error "No suitable driver found" is because the SQLite connector does not include the MySQL JDBC driver and the JDBC URL is malformed. For MySQL, your config should look like:
    Copy code
    archive: connectors/pulsar-io-jdbc-mysql-4.0.0.nar
    configs:
      jdbcUrl: "jdbc:mysql://mysql:3306/mqtt_db"
      userName: "user1"
      password: "1234567890"
      tableName: "pulsar_to_db_temp"
      insertMode: INSERT
      key: "message_id"
      nonKey: "temperature,timestamp,pulsar_timestamp"
    • 1
    • 1
  • r

    RunLLM

    07/16/2025, 6:33 PM
    New RunLLM response generated: >>>*# Question* Good afternoon! I have 4.0.4 of Pulsar deployed and am experiencing an issue where Bookkeeper is leaving some orphaned ledgers around. At first I had this happening while offloading to S3 - my general configuration was to offload every 15 minutes, with a deletionLag of 0. (I've tried tweaking the offload settings many times, but my bookie PVCs kept filling due to these orphaned ledgers). Thinking it might be an issue with offloading, I switched to disabling offloading, increasing ledger volumes, and just setting a shorter retention time. When I look in the
    /pulsar/data/bookkeeper/ledger0/current
    I see files .log files days older than my retention period. When researching this issue, I came across [PCK](https://docs.streamnative.io/private-cloud/v1/tools/pck/pck-overview) which seems to confirm this issue, as it aims to mitigate the problem. This does not look to be publicly available though - are there any alternative solutions? Thanks in advance! # Answer You are correct that PCK is a CLI tool designed to detect and clean up orphaned or missing ledgers in Pulsar/BookKeeper clusters, but it is not publicly available outside StreamNative's private cloud offering. As an alternative, you can use the
    sn-pulsar-tools
    suite, which provides commands to find and delete orphan ledgers from BookKeeper and tiered storage. For example, you can use
    sn-pulsar-tools pck bookie find-orphan-ledger
    to identify and optionally delete orphaned ledgers, and similar commands for tiered storage. These tools require access to your Pulsar cluster's ZooKeeper and BookKeeper service URLs, and can be run with options to target specific ledgers or perform deletions directly (see details and examples). If you do not have access to StreamNative's tools, there is no direct open-source equivalent to PCK, but you can manually inspect and clean up ledgers using BookKeeper's shell commands (e.g.,
    bookkeeper shell listledgers
    ,
    bookkeeper shell deleteledger
    ). However, this is more error-prone and lacks the metadata cross-checks that PCK provides. For persistent issues with orphaned ledgers and disk usage, ensure your retention and offload policies are correctly set, and consider monitoring with available BookKeeper/Pulsar admin tools (see more on PCK and alternatives). Would you like more detail on using BookKeeper shell commands or on configuring offload/retention to avoid this issue? Sources: - PCK Tutorial: Find Orphan Ledgers - PCK Overview Slack Link https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1752690745.929759 πŸ‘ 0 button πŸ‘Ž 0 button πŸ’‘ Hint: Mention @runllm in the thread for followups.
  • r

    RunLLM

    07/17/2025, 10:59 AM
    New RunLLM response generated: >>>*# Question* context β€’ ordering required β€’ have partitioning for throughput β€’ using pulsar client (non-reactive) β€’ listener threads are getting shared across internal queue in a blocking fashion β€’ mostly io ques: in order to have higher throughput i can increase listener threads so that internal queues processing do not block each others. partitions and topics will increase and therefore there’s not end to it. How can pulsar reactive client help with this? i want high throughput with less number of thread. how does it overcome native pulsar client listeners blocking behaviour? pipelining i am aware of but it seems conflicting with partitions # Answer The Pulsar reactive client (pulsar-client-reactive) helps achieve high throughput with fewer threads by leveraging non-blocking, asynchronous processing and fine-grained concurrency controls. Unlike the native Pulsar client, where message listeners are executed on a shared thread pool and can block each other (see ConsumerBase.java#L1152), the reactive client allows you to process messages in parallel while preserving per-key or per-partition ordering, using a small, fixed number of threads or even a single thread per key/partition as needed. The reactive client achieves this by providing APIs like
    ReactiveMessagePipeline
    and
    useKeyOrderedProcessing
    , which let you specify concurrency and ordering guarantees declaratively. This means you can scale throughput efficiently without increasing thread count, and avoid the head-of-line blocking seen in the native client’s listener model. For more, see ReactiveMessagePipelineE2ETests.java and the discussion in pulsar-client-reactive#220. Would you like a step-by-step example of how to configure this in the reactive client? Sources: - ReactiveMessagePipelineE2ETests.java - pulsar-client-reactive#220 Slack Link https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1752749962.223939 πŸ‘ 0 button πŸ‘Ž 0 button πŸ’‘ Hint: Mention @runllm in the thread for followups.