Lari Hotari
05/21/2025, 8:44 AMLari Hotari
05/22/2025, 7:54 AMWallace Peng
05/23/2025, 4:45 AMjava.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$TimeoutException: The producer pulsar-5312-2718615 can not send message to the topic <persistent://14748/change/log> within given timeout : createdAt 126.9 seconds ago, firstSentAt 126.9 seconds ago, lastSentAt 126.9 seconds ago, retryCount 1
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2094)
at org.apache.pulsar.client.impl.ProducerImpl$DefaultSendMessageCallback.onSendComplete(ProducerImpl.java:405)
at org.apache.pulsar.client.impl.ProducerImpl$DefaultSendMessageCallback.sendComplete(ProducerImpl.java:386)
at org.apache.pulsar.client.impl.ProducerImpl$OpSendMsg.sendComplete(ProducerImpl.java:1568)
at org.apache.pulsar.client.impl.ProducerImpl.lambda$failPendingMessages$18(ProducerImpl.java:2124)
at java.base/java.util.ArrayDeque.forEach(ArrayDeque.java:889)
at org.apache.pulsar.client.impl.ProducerImpl$OpSendMsgQueue.forEach(ProducerImpl.java:1659)
at org.apache.pulsar.client.impl.ProducerImpl.failPendingMessages(ProducerImpl.java:2114)
at org.apache.pulsar.client.impl.ProducerImpl.lambda$failPendingMessages$19(ProducerImpl.java:2146)
at org.apache.pulsar.shade.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
at org.apache.pulsar.shade.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at org.apache.pulsar.shade.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:405)
at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:998)
at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.pulsar.client.api.PulsarClientException$TimeoutException: The producer pulsar-5312-2718615 can not send message to the topic <persistent://14748/change/log> within given timeout : createdAt 126.9 seconds ago, firstSentAt 126.9 seconds ago, lastSentAt 126.9 seconds ago, retryCount 1
at org.apache.pulsar.client.impl.ProducerImpl$OpSendMsg.sendComplete(ProducerImpl.java:1565)
... 13 common frames omitted
Broker:
``````Wallace Peng
05/23/2025, 4:47 AMLari Hotari
05/23/2025, 8:27 AMLari Hotari
05/23/2025, 2:01 PMLari Hotari
05/30/2025, 6:01 AMSiNan Liu
06/01/2025, 1:28 PMSiNan Liu
06/02/2025, 12:54 PMackTimeout
, negativeAck
, or reconsumeLater
all cause data re-delivery, but these do not support long delays.
This PR aims to support sending delayed messages on the consumer side, without issues like write amplification or read amplification.
I hope everyone can participate in the discussion. If this plan is feasible, I will propose a PIP.Lari Hotari
06/02/2025, 3:20 PMWallace Peng
06/04/2025, 1:06 AMLari Hotari
06/04/2025, 12:34 PMmarkDeletePosition
) on the remote cluster (c2
) loses at most replicatedSubscriptionsSnapshotFrequencyMillis
worth of acknowledgements can be unmet due to several factors inherent in the design and operational conditions:
> 1. Snapshot Lifecycle Latency:
> ◦ Process: A snapshot isn't instantaneous. It involves c1
sending a request marker, c2
receiving it, c2
sending a response marker (with its current end-of-topic position), and c1
receiving this response to finalize the snapshot. This entire round-trip takes time, potentially affected by replication lag between c1
and c2
, and then back from c2
to c1
.
> ◦ Impact: Even if snapshots are initiated every replicatedSubscriptionsSnapshotFrequencyMillis
, the actual completion and availability of a new snapshot for use can be later. If the markDeletePosition
on c1
advances, it can only use the latest completed snapshot. If many messages are acked on c1
while a snapshot is in progress or delayed, the update to c2
will be based on an older snapshot, making `c2`'s position appear more stale than just the snapshot frequency.
> 2. Snapshot Timeouts (replicatedSubscriptionsSnapshotTimeoutSeconds
):
> ◦ Mechanism: If a remote cluster (e.g., c2
) fails to send a SNAPSHOT_RESPONSE
within replicatedSubscriptionsSnapshotTimeoutSeconds
(default 30s), the snapshot attempt is considered timed out on c1
.
> ◦ Impact: When snapshots time out, new valid mappings between c1
and c2
positions are not established. ReplicatedSubscriptionsController
on c1
will skip creating new snapshots if a pending one has never succeeded (to avoid flooding) or if recent attempts have timed out. It will rely on the lastCompletedSnapshotId
. If this last successful snapshot is old, the markDeletePosition
on c2
will not reflect recent acknowledgements on c1
, leading to a staleness far greater than replicatedSubscriptionsSnapshotFrequencyMillis
. This is a critical point where the expectation can be significantly missed.
> 3. Disconnected Replicators or Cluster Unavailability:
> ◦ Mechanism: The ReplicatedSubscriptionsController
explicitly checks if all remote replicators are connected before initiating a new snapshot. If any replicator is down, it skips the snapshot.
> ◦ Impact: Similar to timeouts, if c2
is unreachable or its replicator link is broken, no new snapshots will be formed. c1
will continue to use the last known good snapshot, causing `c2`'s subscription state to become increasingly stale as messages are acknowledged on c1
.
> 4. No New Message Production on the Topic:
> • Mechanism: ReplicatedSubscriptionsController
has an optimization: if no new user messages have been published to the topic on c1
since the last completed snapshot (topic.getLastMaxReadPositionMovedForwardTimestamp()
> ◦ < lastCompletedSnapshotStartTime
), it skips creating a new snapshot.
> ◦ Impact: If the topic is idle in terms of new publishes, but consumers on c1
are catching up and acknowledging older messages (thus advancing markDeletePosition
), new snapshots correlating these newer markDeletePosition
states on c1
with c2
positions might not be created frequently. The updates to c2
will use older snapshot data.
> 5. Nature of markDeletePosition
with Individual Acknowledges:
> ◦ PIP-33 Limitation: The system replicates the markDeletePosition
. Individual acknowledgements only advance the markDeletePosition
when a contiguous block of messages from the current markDeletePosition
is acknowledged. If there are "holes" due to out-of-order individual acks, the markDeletePosition
lags.
> ◦ Impact: Even if snapshots are frequent and successful, the state replicated to c2
is the markDeletePosition
from c1
. If this position itself is lagging on c1
due to ack patterns, c2
will reflect that same lag. This isn't a loss of acknowledged state by the replication mechanism beyond replicatedSubscriptionsSnapshotFrequencyMillis
but rather a characteristic of what is being replicated.
> 6. Initial State and First Snapshot:
> ◦ When a replicated subscription is first enabled or a topic is new, it takes time for the first successful snapshot cycle to complete. Acknowledgements on c1
before this first usable snapshot is available won't be immediately reflected on c2
.
> In essence, replicatedSubscriptionsSnapshotFrequencyMillis
is the attempt frequency, not a guaranteed bound on state divergence. The actual synchronization delay is subject to the successful and timely completion of the entire snapshotting and update marker propagation process, which can be affected by network conditions, cluster load, and remote cluster responsiveness. The replicatedSubscriptionsSnapshotTimeoutSeconds
plays a more significant role in how long state can remain divergent if issues arise.Chris Bono
06/12/2025, 2:30 PMpulsar-client-reactive
to be included in next week’s Spring Pulsar service release. I have a small PR that updates the version number and begins this process. However, the PR requires at least 1 reviewer and Lari usually handles the reviews but is on PTO.
If anyone has the ability to review https://github.com/apache/pulsar-client-reactive/pull/228 it would be greatly appreciated.
thankyouChris Bono
06/12/2025, 6:17 PMSiNan Liu
06/13/2025, 12:16 PMFabian Wikstrom
06/18/2025, 3:49 PMLari Hotari
06/19/2025, 9:25 PM孟祥迎
06/25/2025, 6:26 AMYabin m
06/26/2025, 4:18 PMThomas MacKenzie
07/12/2025, 4:56 AM太上玄元道君
07/16/2025, 10:30 AMLari Hotari
07/17/2025, 4:37 PMLari Hotari
07/23/2025, 1:04 PMLari Hotari
07/25/2025, 1:05 PMLari Hotari
07/25/2025, 2:30 PMLari Hotari
07/26/2025, 8:24 PMHideaki Oguni
07/29/2025, 8:28 AMdispatcherRetryBackoffInitialTimeInMs=1
and dispatcherRetryBackoffMaxTimeInMs=1000
.
As a result, I observed 27,196 ack holes, which is nearly the same as before the backoff mechanism was introduced.
2025-07-25T20:45:04,685+0900 [main] INFO playground.TestScenarioIssueKeyShared - Done receiving. Remaining: 0 duplicates: 0 unique: 1000000
max latency difference of subsequent messages: 58393 ms
max ack holes: 27196
2025-07-25T20:45:04,686+0900 [main] INFO playground.TestScenarioIssueKeyShared - Consumer consumer1 received 263558 unique messages 0 duplicates in 472 s, max latency difference of subsequent messages 58310 ms
2025-07-25T20:45:04,686+0900 [main] INFO playground.TestScenarioIssueKeyShared - Consumer consumer2 received 241250 unique messages 0 duplicates in 463 s, max latency difference of subsequent messages 58393 ms
2025-07-25T20:45:04,686+0900 [main] INFO playground.TestScenarioIssueKeyShared - Consumer consumer3 received 221278 unique messages 0 duplicates in 432 s, max latency difference of subsequent messages 52344 ms
2025-07-25T20:45:04,686+0900 [main] INFO playground.TestScenarioIssueKeyShared - Consumer consumer4 received 273914 unique messages 0 duplicates in 472 s, max latency difference of subsequent messages 58393 ms
Additionally, when I applied a modification to limit the scope of backoff only to replay messages, the number of ack holes reduced to 835.
2025-07-25T19:04:26,485+0900 [main] INFO playground.TestScenarioIssueKeyShared - Done receiving. Remaining: 0 duplicates: 0 unique: 1000000
max latency difference of subsequent messages: 3309 ms
max ack holes: 835
2025-07-25T19:04:26,485+0900 [main] INFO playground.TestScenarioIssueKeyShared - Consumer consumer1 received 263558 unique messages 0 duplicates in 472 s, max latency difference of subsequent messages 2473 ms
2025-07-25T19:04:26,485+0900 [main] INFO playground.TestScenarioIssueKeyShared - Consumer consumer2 received 241250 unique messages 0 duplicates in 471 s, max latency difference of subsequent messages 1230 ms
2025-07-25T19:04:26,485+0900 [main] INFO playground.TestScenarioIssueKeyShared - Consumer consumer3 received 221278 unique messages 0 duplicates in 471 s, max latency difference of subsequent messages 1230 ms
2025-07-25T19:04:26,485+0900 [main] INFO playground.TestScenarioIssueKeyShared - Consumer consumer4 received 273914 unique messages 0 duplicates in 471 s, max latency difference of subsequent messages 3309 ms
This suggests that it is the replay messages that are failing to be delivered, and that the backoff is being reset by the delivery of normal messages.
There is a possibility that the same behavior exists on the master branch.
This would mean that, with the default settings, the backoff mechanism has little effect.
However, I don't consider this a serious issue, since it has introduced a look-ahead limit (https://github.com/apache/pulsar/pull/23231).
The default values were changed in https://github.com/apache/pulsar/pull/23340 due to issues caused by backoff under low traffic.
However, even when I set dispatcherRetryBackoffInitialTimeInMs=100
and dispatcherRetryBackoffMaxTimeInMs=1000
, I was unable to reproduce an increase in consumption latency under low traffic.
I couldn't find any mention of this issue in the mailing list or Slack archives.
Could you share the specific conditions under which this issue was observed?Lari Hotari
07/30/2025, 12:23 PMLari Hotari
08/04/2025, 12:50 PMSiNan Liu
08/04/2025, 1:08 PM