RunLLM
05/29/2025, 10:06 PMorg.apache.zookeeper.server.quorum.Learner - Revalidating client:0x3015fd17ce80066
and also on broker side some logs like
it first closed the topic
2025-05-29T21:22:26,341+0000 [bookkeeper-ml-scheduler-OrderedScheduler-0-0] INFO org.apache.pulsar.broker.service.persistent.PersistentTopic - [<persistent://6614/Salesforce/New> Contact] Topic closed
then it will try to create the topic almost immediately but it may fail for once then retry creating then succeed.
any idea why it is doing this ?
# Answer
The log Learner - Revalidating client
is a normal ZooKeeper message indicating session revalidation, often seen during client reconnects or session renewals. The broker log showing a topic being closed and then immediately recreated, sometimes failing once before succeeding, is typically due to transient ZooKeeper or BookKeeper metadata consistency issues or short-lived network hiccups during topic ownership changes or broker restarts. This pattern is not uncommon after upgrades or in clusters with high metadata activity, and Pulsar is designed to retry such operations automatically until successful, as seen in your logs.
If the topic creation fails once and then succeeds on retry, it usually means the broker was temporarily unable to access the required metadata (possibly due to ZooKeeper session revalidation or a brief lag in metadata propagation). Unless you see persistent failures or data loss, these logs are generally informational and not a cause for concern. For more, see related Pulsar broker and ZooKeeper session handling discussions and metadata consistency notes.
Would you like more detail on how Pulsar handles topic metadata and retries after such events?
Sources:
- Pulsar broker stuck on zkSession recreate loop
- Metadata consistency issues from userβs point of view
Slack Link
https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1748556363.841139 π 0 button π 0 button π‘ Hint: Mention @runllm in the thread for followups.RunLLM
05/30/2025, 9:45 AMRunLLM
05/30/2025, 8:16 PMallowAutoTopicCreation: "false"
prevents automatic creation of all topics, including dead letter topics (DLQ). If a consumer tries to use a dead letter policy and the DLQ topic does not exist, Pulsar will not auto-create it, and the operation will fail unless the topic is manually created first using pulsar-admin
or another admin tool. This is confirmed in the Pulsar documentation: "If this parameter is set but the broker's allowAutoSubscriptionCreation is disabled, the DLQ producer will fail to be created."
Would you like more detail on how to manually create DLQ topics or handle this scenario?
Sources:
- Pulsar documentation
Slack Link
https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1748636138.806349 π 0 button π 0 button π‘ Hint: Mention @runllm in the thread for followups.RunLLM
06/03/2025, 4:39 PMfailed to get Partitioned metadata : {\"errorMsg\":\"org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /managed-ledgers/15548/Orchestration/persistent/closed_periods\",\"reqId\":2511449849216011802, \"remote\":\"pulsar-broker.pulsar.svc.cluster.local.analytics.svc.cluster.local/10.1.177.78:6650\", \"local\":\"/10.11.68.215:34530\"}","logger":"o.a.p.c.i.BinaryProtoLookupService","thread":"pulsar-client-io-5-1","level":"WARN","stacktrace":"<#f1bccd48> o.a.p.c.a.PulsarClientException$BrokerMetadataException: {\"errorMsg\":\"org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /managed-ledgers/15548/Orchestration/persistent/closed_periods\",\"reqId\":2511449849216011802, \"remote\":\"pulsar-broker.pulsar.svc.cluster.local.analytics.svc.cluster.local/10.1.177.78:6650\", \"local\":\"/10.11.68.215:34530\"}\n\tat o.a.p.c.i.ClientCnx.getPulsarClientException(ClientCnx.java:1352)\n\tat o.a.p.c.i.ClientCnx.handlePartitionResponse(ClientCnx.java:691)\n\tat o.a.p.c.p.PulsarDecoder.channelRead(PulsarDecoder.java:144)\n\tat o.a.p.s.i.n.c.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\tat o.a.p.s.i.n.c.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat o.a.p.s.i.n.c.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat o.a.p.s.i.n.h.c.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)\n\tat o.a.p.s.i.n.h.c.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)\n\tat o.a.p.s.i.n.c.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\tat o.a.p.s.i.n.c.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat o.a.p.s.i.n.c.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat o.a.p.s.i.n.h.f.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152)\n\tat o.a.p.s.i.n.c.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)\n\tat o.a.p.s.i.n.c.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat o.a.p.s.i.n.c.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat o.a.p.s.i.n.c.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357)\n\tat o.a.p.s.i.n.c.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)\n\tat o.a.p.s.i.n.c.Abst...\n"}
RunLLM
06/06/2025, 7:10 AMRunLLM
06/07/2025, 12:11 AMRunLLM
06/09/2025, 4:59 PMRunLLM
06/10/2025, 4:56 PMRunLLM
06/11/2025, 4:08 PMRunLLM
06/14/2025, 8:48 AMRunLLM
06/17/2025, 10:00 AMRunLLM
06/19/2025, 9:16 AMMessageId.deserialize
in the Pulsar Node.js client, it can throw a C++ exception that is not caught, causing the Node.js process to crash. This is similar to the issue previously fixed for client creation, where C++ exceptions are now wrapped and handled in Node.js, as shown in PR #307. However, this exception handling has not yet been implemented for MessageId.deserialize
, and the problem remains open for that method.
No official fix for this specific issue in MessageId.deserialize
is present in the data sources, and there is no PR addressing it yet. You or another contributor could propose a fix by wrapping the C++ code in a try-catch block and converting exceptions to JavaScript errors, similar to the approach in PR #307. Would you like more detail on how to implement this in the C++ binding?
Sources:
- PR #307
Slack Link
https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1750324556.646719 π 0 button π 0 button π‘ Hint: Mention @runllm in the thread for followups.RunLLM
06/20/2025, 11:44 AMGrafana dashboard usageβΎ
RunLLM
06/23/2025, 12:20 PMRunLLM
06/23/2025, 12:29 PM--time 10m
using the pulsar-admin namespaces set-offload-threshold
command, and the system will trigger offloading when the time threshold is reached, as long as the current ledger segment is full and rolled over. Negative values disable offloading, and 0 triggers immediate offload as soon as possible (Pulsar docs, Pulsar admin reference).
Would you like more detail on the trade-offs or operational considerations for such a short offload interval?
Sources:
- Pulsar docs
- Pulsar admin reference
Slack Link
https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1750681713.939359 π 0 button π 0 button π‘ Hint: Mention @runllm in the thread for followups.RunLLM
06/24/2025, 7:26 AMjava.lang.IllegalStateException: Field 'message' is not set
at org.apache.pulsar.common.api.proto.CommandLookupTopicResponse.getMessage(CommandLookupTopicResponse.java:220)
at org.apache.pulsar.client.impl.ClientCnx.handleLookupResponse(ClientCnx.java:629)
at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:154)
After restarting the pulsar function itself it was working again but after the client noticed that it is not working, already customers complained.
From my understanding this is what happens.
β’ The function's client got into a "stuck" state because of the unhandled exception. It was not retrying properly.
# Answer
Your understanding is correct: the Pulsar Function's client encountered an unhandled exception (IllegalStateException: Field 'message' is not set
) during a lookup response, which caused the function to get "stuck" and stop retrying or reconnecting as expected. Restarting the function resolved the issue, but only after service impact was noticed. This is consistent with known issues where Pulsar client-side exceptions during protocol handling (like lookup failures) can leave the client in a bad state, especially if the error is not gracefully handled or retried by the function runtime. This is not unique to your environment and has been observed in other Pulsar deployments as well.
For further mitigation, ensure you are running the latest supported Pulsar version, as some related bugs have been fixed in recent releases. Also, consider implementing external monitoring and automated restarts for functions that enter a failed state, since the Pulsar Functions framework may not always recover from such protocol-level exceptions automatically. For more details, see the discussion in Pulsar Functions error handling and the relevant ClientCnx.java code.
Would you like a step-by-step breakdown of the error flow and possible improvements to function resiliency?
Sources:
- Pulsar Functions error handling
- ClientCnx.java code
Slack Link
https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1750749913.510049 π 0 button π 0 button π‘ Hint: Mention @runllm in the thread for followups.RunLLM
06/24/2025, 7:47 AMjava.lang.IllegalStateException: Field 'message' is not set
at org.apache.pulsar.common.api.proto.CommandLookupTopicResponse.getMessage(CommandLookupTopicResponse.java:220)
at org.apache.pulsar.client.impl.ClientCnx.handleLookupResponse(ClientCnx.java:629)
at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:154)
The issue persisted until the function pods were manually restarted, by which time their own customers had already been impacted.
Our Hypothesis:
Our theory is that the function's internal client entered a "stuck" state during the broker rolling upgrade and failed to recover.
We also discovered a version mismatch between the cluster and the client libraries:
β’ *Client Libraries:*pulsar-client and pulsar-functions-api are both version 3.2.1.
β’ Cluster Version:3.0.11.
My question are now:
Is it a requirement for the client library versions to strictly match the Pulsar cluster version? Could this version skew be a potential cause for this kind of failure during a rolling upgrade?
Are you aware of any existing GitHub issues related to this specific IllegalStateException in this context? We were unable to find a direct match in our own research.
Thanks in advance!
# Answer
It seems likely that a version mismatch between Pulsar client libraries (3.2.1) and the broker (3.0.11) could contribute to issues like the IllegalStateException you observed, especially during rolling upgrades. While Pulsar is designed for client-broker compatibility across versions, new client features or protocol changes may not be fully supported by older brokers, potentially leading to unexpected errors or "stuck" states in clients or Pulsar Functions. This is supported by the official documentation, which notes that new features relying on protocol updates require both newer clients and brokers, and that strict version matching is not required but recommended for stability and feature parity (Pulsar client libraries).RunLLM
06/28/2025, 5:21 AMRunLLM
06/30/2025, 9:36 AMbroker.conf
and `bookkeeper.conf`:
- For Pulsar:
metadataStoreUrl=etcd:http://<etcd1>:2379,http://<etcd2>:2379,http://<etcd3>:2379
configurationMetadataStoreUrl=etcd:http://<etcd1>:2379,http://<etcd2>:2379,http://<etcd3>:2379
- For BookKeeper:
metadataServiceUri=etcd://<etcd1>:2379,<etcd2>:2379,<etcd3>:2379
Note: etcd support in BookKeeper is not fully completeβfeatures like under-replication management are missing, and there are known issues with stability and compatibility in some Pulsar/BookKeeper versions. Some users have reported connection and session issues when using etcd, especially in Kubernetes environments. If you encounter errors, check logs for etcd session or lease errors and consider falling back to ZooKeeper if critical features are needed. See Configure metadata store | Apache Pulsar and Incomplete implementation for etcd driver for metadata.
Would you like more detail on troubleshooting specific errors or version compatibility?
Sources:
- Configure metadata store | Apache Pulsar
- Incomplete implementation for etcd driver for metadata
- ETCD doesn't work as a metadata store
Slack Link
https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1751276182.014709 π 0 button π 0 button π‘ Hint: Mention @runllm in the thread for followups.RunLLM
07/01/2025, 11:43 AMbin/pulsar-admin namespaces unload <tenant>/<namespace>
This command will unload all bundles in the specified namespace, prompting Pulsar to reassign them to brokers based on current load. For more granular control, you can unload specific bundles. This approach avoids downtime and helps restore balanced distribution. For more details, see the Pulsar load balancing quick start guide.
Would you like more detail on configuring automatic load balancing or troubleshooting persistent imbalance?
Sources:
- Pulsar load balancing quick start guide
Slack Link
https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1751370199.017679 π 0 button π 0 button π‘ Hint: Mention @runllm in the thread for followups.RunLLM
07/02/2025, 6:03 AMRunLLM
07/02/2025, 6:48 PMserviceUrl
for the local cluster.
I enabled geo-replication at the namespace level, not at the topic level.
Does anyone know how to solve this issue? Using v4.0.1
# Answer
If you have enabled geo-replication at the namespace level, topics created in that namespace should be automatically replicated across clusters, and you should not need to manually create the topic on each cluster. If this is not happening, it is likely due to a configuration issue or a mismatch in how the clusters are set up.
Key points to check:
1. Namespace Replication Clusters: Ensure you have set the replication clusters for the namespace using:
bin/pulsar-admin namespaces set-clusters <tenant>/<namespace> --clusters <cluster1>,<cluster2>
2. Tenant Allowed Clusters: The tenant must have both clusters listed in its allowed clusters:
bin/pulsar-admin tenants create <tenant> --allowed-clusters <cluster1>,<cluster2>
3. Topic Creation: When you create a topic in the geo-replicated namespace, Pulsar should automatically create the topic in the remote cluster. If not, check that both clusters can communicate and that the configuration stores are set up correctly.
4. Auto Topic Creation: Ensure that allowAutoTopicCreation
is enabled (default is true). If disabled, topics will not be auto-created on the remote cluster.
For more details, see Pulsar geo-replication documentation and StreamNative geo-replication guide.
Would you like a step-by-step troubleshooting checklist or more detail on a specific configuration?
Sources:
- Pulsar geo-replication documentation
- StreamNative geo-replication guide
Slack Link
https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1751482089.785439 π 0 button π 0 button π‘ Hint: Mention @runllm in the thread for followups.RunLLM
07/03/2025, 7:40 AMreceiver_queue_size
and max_total_receiver_queue_size_across_partitions
, but these only limit how many messages are fetched into each consumer's queue, not how the backlog is distributed when new consumers join. This is a known challenge for bursty, long-running workloads with dynamic scaling, as described in your linked discussion and confirmed in the Pulsar documentation and community feedback (Pulsar as a message queue, GitHub discussion).
A common workaround is to pre-scale your consumers before the burst arrives, so all are present when messages are published, or to implement application-level logic to coordinate message claiming and processing. Alternatively, you can experiment with setting a very low receiver_queue_size
(e.g., 1) for each consumer, but this does not guarantee strict one-message-per-consumer distribution for existing backlog. There is no built-in broker-side "max in-flight per consumer" setting for Shared subscriptions. Would you like more detail on possible architectural patterns or code-level workarounds for this use case?
Sources:
- Pulsar as a message queueRunLLM
07/04/2025, 8:34 AMRunLLM
07/07/2025, 1:43 PMRunLLM
07/09/2025, 12:30 AMRunLLM
07/09/2025, 1:55 PMRunLLM
07/13/2025, 6:28 AMtenant: public
namespace: default
name: jdbc_sink_pulsar_to_mysql_temp
archive: connectors/pulsar-io-jdbc-sqlite-4.0.0.nar
inputs:
- <persistent://public/default/temp_schema>
configs:
jdbcUrl: "jdbc:<mysql://mysql:3306/mqtt_db>"
userName: "user1"
password: "1234567890"
tableName: "pulsar_to_db_temp"
insertMode: INSERT
key: "message_id"
nonKey: "temperature,timestamp,pulsar_timestamp"
and i mount the connector under the connectors folder like so - ./pulsar-mysql/pulsar-io-jdbc-sqlite-4.0.0.nar:/pulsar/connectors/pulsar-io-jdbc-sqlite-4.0.0.nar
but i get this error
ERROR org.apache.pulsar.functions.instance.JavaInstanceRunnable - Sink open produced uncaught exception:
java.sql.SQLException: No suitable driver found for jdbc:<mysql://mysql:3306/mqtt_db>
at java.sql.DriverManager.getConnection(Unknown Source) ~[java.sql:?]
at java.sql.DriverManager.getConnection(Unknown Source) ~[java.sql:?]
at org.apache.pulsar.io.jdbc.JdbcAbstractSink.open(JdbcAbstractSink.java:97) ~[pulsar-io-jdbc-core-4.0.0.jar:?]
at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupOutput(JavaInstanceRunnable.java:1080) ~[?:?]
at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setup(JavaInstanceRunnable.java:263) ~[?:?]
at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:313) ~[?:?]
at java.lang.Thread.run(Unknown Source) [?:?]
Am I using the wrong connector? Or am I missing a configuration
# Answer
You are using the wrong connector archive and JDBC URL format. The archive pulsar-io-jdbc-sqlite-4.0.0.nar
is for SQLite, not MySQL. For MySQL, you should use the MySQL-specific connector (e.g., pulsar-io-jdbc-mysql-4.0.0.nar
), and your JDBC URL should be in the format jdbc:mysql://mysql:3306/mqtt_db
(without the angle brackets and <mysql:...>
). The error "No suitable driver found" is because the SQLite connector does not include the MySQL JDBC driver and the JDBC URL is malformed.
For MySQL, your config should look like:
archive: connectors/pulsar-io-jdbc-mysql-4.0.0.nar
configs:
jdbcUrl: "jdbc:mysql://mysql:3306/mqtt_db"
userName: "user1"
password: "1234567890"
tableName: "pulsar_to_db_temp"
insertMode: INSERT
key: "message_id"
nonKey: "temperature,timestamp,pulsar_timestamp"
RunLLM
07/16/2025, 6:33 PM/pulsar/data/bookkeeper/ledger0/current
I see files .log files days older than my retention period.
When researching this issue, I came across [PCK](https://docs.streamnative.io/private-cloud/v1/tools/pck/pck-overview) which seems to confirm this issue, as it aims to mitigate the problem. This does not look to be publicly available though - are there any alternative solutions?
Thanks in advance!
# Answer
You are correct that PCK is a CLI tool designed to detect and clean up orphaned or missing ledgers in Pulsar/BookKeeper clusters, but it is not publicly available outside StreamNative's private cloud offering. As an alternative, you can use the sn-pulsar-tools
suite, which provides commands to find and delete orphan ledgers from BookKeeper and tiered storage. For example, you can use sn-pulsar-tools pck bookie find-orphan-ledger
to identify and optionally delete orphaned ledgers, and similar commands for tiered storage. These tools require access to your Pulsar cluster's ZooKeeper and BookKeeper service URLs, and can be run with options to target specific ledgers or perform deletions directly (see details and examples).
If you do not have access to StreamNative's tools, there is no direct open-source equivalent to PCK, but you can manually inspect and clean up ledgers using BookKeeper's shell commands (e.g., bookkeeper shell listledgers
, bookkeeper shell deleteledger
). However, this is more error-prone and lacks the metadata cross-checks that PCK provides. For persistent issues with orphaned ledgers and disk usage, ensure your retention and offload policies are correctly set, and consider monitoring with available BookKeeper/Pulsar admin tools (see more on PCK and alternatives). Would you like more detail on using BookKeeper shell commands or on configuring offload/retention to avoid this issue?
Sources:
- PCK Tutorial: Find Orphan Ledgers
- PCK Overview
Slack Link
https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1752690745.929759 π 0 button π 0 button π‘ Hint: Mention @runllm in the thread for followups.RunLLM
07/17/2025, 10:59 AMReactiveMessagePipeline
and useKeyOrderedProcessing
, which let you specify concurrency and ordering guarantees declaratively. This means you can scale throughput efficiently without increasing thread count, and avoid the head-of-line blocking seen in the native clientβs listener model. For more, see ReactiveMessagePipelineE2ETests.java and the discussion in pulsar-client-reactive#220.
Would you like a step-by-step example of how to configure this in the reactive client?
Sources:
- ReactiveMessagePipelineE2ETests.java
- pulsar-client-reactive#220
Slack Link
https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1752749962.223939 π 0 button π 0 button π‘ Hint: Mention @runllm in the thread for followups.