Soumya Ghosh
01/16/2025, 8:49 PM{
"msgRateIn": 0,
"msgThroughputIn": 0,
"msgRateOut": 0,
"msgThroughputOut": 0,
"bytesInCounter": 270413,
"msgInCounter": 2697,
"systemTopicBytesInCounter": 0,
"bytesOutCounter": 358717,
"msgOutCounter": 3577,
"bytesOutInternalCounter": 0,
"averageMsgSize": 0,
"msgChunkPublished": false,
"storageSize": 270413,
"backlogSize": 270413,
"backlogQuotaLimitSize": -1,
"backlogQuotaLimitTime": -1,
"oldestBacklogMessageAgeSeconds": 29328,
"oldestBacklogMessageSubscriptionName": "java-sdk",
"publishRateLimitedTimes": 0,
"earliestMsgPublishTimeInBacklogs": 0,
"offloadedStorageSize": 268368,
"lastOffloadLedgerId": 0,
"lastOffloadSuccessTimeStamp": 0,
"lastOffloadFailureTimeStamp": 0,
"ongoingTxnCount": 0,
"abortedTxnCount": 0,
"committedTxnCount": 0,
"publishers": [],
"waitingPublishers": 0,
"subscriptions": {
"console_sub": {
"msgRateOut": 0,
"msgThroughputOut": 0,
"bytesOutCounter": 0,
"msgOutCounter": 0,
"msgRateRedeliver": 0,
"messageAckRate": 0,
"chunkedMessageRate": 0,
"msgBacklog": 2697,
"backlogSize": 270413,
"earliestMsgPublishTimeInBacklog": 0,
"msgBacklogNoDelayed": 2697,
"blockedSubscriptionOnUnackedMsgs": false,
"msgDelayed": 0,
"msgInReplay": 0,
"unackedMessages": 0,
"type": "Failover",
"msgRateExpired": 0,
"totalMsgExpired": 0,
"lastExpireTimestamp": 0,
"lastConsumedFlowTimestamp": 0,
"lastConsumedTimestamp": 0,
"lastAckedTimestamp": 0,
"lastMarkDeleteAdvancedTimestamp": 0,
"consumers": [
{
"msgRateOut": 0,
"msgThroughputOut": 0,
"bytesOutCounter": 0,
"msgOutCounter": 0,
"msgRateRedeliver": 0,
"messageAckRate": 0,
"chunkedMessageRate": 0,
"availablePermits": 1000,
"unackedMessages": 0,
"avgMessagesPerEntry": 0,
"blockedConsumerOnUnackedMsgs": false,
"drainingHashesCount": 0,
"drainingHashesClearedTotal": 0,
"drainingHashesUnackedMessages": 0,
"lastAckedTimestamp": 0,
"lastConsumedTimestamp": 0,
"lastConsumedFlowTimestamp": 0,
"lastAckedTime": "1970-01-01T00:00:00Z",
"lastConsumedTime": "1970-01-01T00:00:00Z"
}
],
"isDurable": true,
"isReplicated": false,
"allowOutOfOrderDelivery": false,
"consumersAfterMarkDeletePosition": {},
"drainingHashesCount": 0,
"drainingHashesClearedTotal": 0,
"drainingHashesUnackedMessages": 0,
"nonContiguousDeletedMessagesRanges": 0,
"nonContiguousDeletedMessagesRangesSerializedSize": 0,
"delayedMessageIndexSizeInBytes": 0,
"subscriptionProperties": {},
"filterProcessedMsgCount": 0,
"filterAcceptedMsgCount": 0,
"filterRejectedMsgCount": 0,
"filterRescheduledMsgCount": 0,
"durable": true,
"replicated": false
},
"java-sdk": {
"msgRateOut": 0,
"msgThroughputOut": 0,
"bytesOutCounter": 358717,
"msgOutCounter": 3577,
"msgRateRedeliver": 0,
"messageAckRate": 0,
"chunkedMessageRate": 0,
"msgBacklog": 30,
"backlogSize": 3069,
"earliestMsgPublishTimeInBacklog": 0,
"msgBacklogNoDelayed": 30,
"blockedSubscriptionOnUnackedMsgs": false,
"msgDelayed": 0,
"msgInReplay": 0,
"unackedMessages": 0,
"type": "Failover",
"msgRateExpired": 0,
"totalMsgExpired": 0,
"lastExpireTimestamp": 0,
"lastConsumedFlowTimestamp": 0,
"lastConsumedTimestamp": 0,
"lastAckedTimestamp": 0,
"lastMarkDeleteAdvancedTimestamp": 0,
"consumers": [
{
"msgRateOut": 0,
"msgThroughputOut": 0,
"bytesOutCounter": 0,
"msgOutCounter": 0,
"msgRateRedeliver": 0,
"messageAckRate": 0,
"chunkedMessageRate": 0,
"availablePermits": 1000,
"unackedMessages": 0,
"avgMessagesPerEntry": 0,
"blockedConsumerOnUnackedMsgs": false,
"drainingHashesCount": 0,
"drainingHashesClearedTotal": 0,
"drainingHashesUnackedMessages": 0,
"lastAckedTimestamp": 0,
"lastConsumedTimestamp": 0,
"lastConsumedFlowTimestamp": 0,
"lastAckedTime": "1970-01-01T00:00:00Z",
"lastConsumedTime": "1970-01-01T00:00:00Z"
}
],
"isDurable": true,
"isReplicated": false,
"allowOutOfOrderDelivery": false,
"consumersAfterMarkDeletePosition": {},
"drainingHashesCount": 0,
"drainingHashesClearedTotal": 0,
"drainingHashesUnackedMessages": 0,
"nonContiguousDeletedMessagesRanges": 0,
"nonContiguousDeletedMessagesRangesSerializedSize": 0,
"delayedMessageIndexSizeInBytes": 0,
"subscriptionProperties": {},
"filterProcessedMsgCount": 0,
"filterAcceptedMsgCount": 0,
"filterRejectedMsgCount": 0,
"filterRescheduledMsgCount": 0,
"durable": true,
"replicated": false
}
},
"replication": {},
"nonContiguousDeletedMessagesRanges": 0,
"nonContiguousDeletedMessagesRangesSerializedSize": 0,
"delayedMessageIndexSizeInBytes": 0,
"compaction": {
"lastCompactionRemovedEventCount": 0,
"lastCompactionSucceedTimestamp": 0,
"lastCompactionFailedTimestamp": 0,
"lastCompactionDurationTimeInMills": 0
},
"metadata": {
"partitions": 1,
"deleted": false
},
"partitions": {}
}
Attached screenshots, backlog of 2.7k is captured when CLI consumer is started and it recovers after 10 minutes. The metrics for dispatch rate and dispatch throughput spikes at 12:13 when messages are delivered to CLI and a Java application.David K
01/16/2025, 9:06 PM"lastAckedTime" : "1970-01-01T00:00:00Z",
"lastConsumedTime" : "1970-01-01T00:00:00Z"
David K
01/16/2025, 9:06 PMSandesh Shingare
01/17/2025, 1:18 PMmaxPendingMessages
in pulsar Java producer client? The doc says 0, meaning the check will be disabled by default. We have set blockIfQueueFull
to true in our producer without explicitly setting maxPendingMessages
config. In this case at what queue size the producer send calls will be blocked? I see 166 being shown as maxPendingMessages
in the producer stats when I run my application. Can someone please help understand this?
Pulsar client version: 4.0.1Jerome B
01/17/2025, 11:24 PMbmtrivedi
01/19/2025, 3:22 AMLari Hotari
01/20/2025, 3:41 PMLari Hotari
01/20/2025, 3:42 PMLari Hotari
01/20/2025, 3:43 PMojoxdan
01/21/2025, 3:52 PM-1
, after the last configuration I was expecting the rate to increase but was stuck at 150,
Do I need to clear or restart something in in my k8 cluster ?
@David K @Yu Wei Sung @Lari HotariAlexander Brown
01/22/2025, 5:26 PMPreet Patel
01/23/2025, 9:25 AMOlivier NOUGUIER
01/24/2025, 2:04 PMLari Hotari
01/27/2025, 8:30 AM-e "_JAVA_OPTIONS=-XX:UseSVE=0"
to pass the environment option on the docker command line:
docker run --rm -it -p 6650:6650 -p 8080:8080 -e "_JAVA_OPTIONS=-XX:UseSVE=0" apachepulsar/pulsar:4.0.2 bin/pulsar standalone -nfw -nss
The problem will be fixed in Pulsar 4.0.3 docker images which will include the latest Corretto OpenJDK release which contains the fix.
More details in https://github.com/apache/pulsar/issues/23891#issuecomment-2615087017 .Shubh Vanvat
01/27/2025, 2:14 PMSoumya Ghosh
01/29/2025, 1:50 PMorg.apache.pulsar.client.api.PulsarClientException$TimeoutException
after 30 seconds.
Setup summary:
Setup summary: Pulsar 4.0.1, 3 brokers, 5 bookies, 3 ZKs, Produce and Consume through Pulsar Java client (4.0.1)
We also upgrade Pulsar cluster and client version to 4.0.2 but still observing the same issue.
// Pulsar client
this.pulsarClient =
PulsarClient.builder()
.serviceUrl(connectionUrl)
.memoryLimit(0, SizeUnit.MEGA_BYTES)
.build();
// Producer configurations
ProducerBuilder<T> builder =
pulsarClient
.newProducer(schema)
.producerName(producerName)
.topic(topicName)
.compressionType(CompressionType.LZ4)
.enableBatching(true)
.batchingMaxPublishDelay(30, TimeUnit.MILLISECONDS)
.blockIfQueueFull(true);
Pulsar brokers, bookies and client instances are running in the same AWS VPC, so chances of network outage are quite low. Brokers are configured to use advertisedAddress
, advertisedListeners
is not configured
In the client application, the Pulsar client and Producer object is initialized once and then messages are produced in sporadic intervals.
There is no other producer or consumer running on the cluster and there is more than sufficient resources available to cater to produce request.
Following are attached screenshots of client application and broker node which owns this topic.
Time frame - 10:30 UTC
We observed that after message failed to produce in 30 seconds, Pulsar producer clients were closed due to keep-alive timeout expiry and then recreated. At the same time we see in brokers logs Connection reset by peer
Any thoughts on what could be going wrong here?Daniel Kaminski
01/29/2025, 3:54 PM[pulsar-io-6-4] WARN org.apache.pulsar.broker.service.AbstractReplicator - [<persistent://tenant/namespace/__change_events> | pulsar-prod-ha1-->pulsar-prod-ha2] Failed to create remote producer (org.apache.pulsar.client.api.PulsarClientException$IncompatibleSchemaException: {"errorMsg":"org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: Producers cannot connect or send message without a schema to topics with a schema caused by org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: Producers cannot connect or send message without a schema to topics with a schema","reqId":3006917652050389956, "remote":"our.pulsar.cluster/<ip-address>:6651", "local":"/<ip-address>:34314"}), retrying in 57.934 s
2025-01-29T14:41:26,689+0000 [pulsar-io-6-4] ERROR org.apache.pulsar.client.impl.ProducerImpl - [<persistent://tenant/namespace/__change_events>] [pulsar.repl.pulsar-prod-ha1-->pulsar-prod-ha2] Failed to create producer: {"errorMsg":"org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: Producers cannot connect or send message without a schema to topics with a schema caused by org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: Producers cannot connect or send message without a schema to topics with a schema","reqId":3006917652050389956, "remote":"our.pulsar.cluster/<ip-address>:6651", "local":"/<ip-address>"}
025-01-29T14:41:26,689+0000 [pulsar-io-6-4] WARN org.apache.pulsar.client.impl.ClientCnx - [id: 0x755f1150, L:/100.70.32.147:34314 - R:rour.pulsar.cluster/<ip-address>:6651] Received error from server: org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: Producers cannot connect or send message without a schema to topics with a schema caused by org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException: Producers cannot connect or send message without a schema to topics with a schema
I turned off schemaValidationEnforced off on namespace level and i can confirm that the error is gone and I do not see it for any system topic for that specific namespace.
Can you confirm to me that this is intended? I guess the schemaValidationEnforced: true is inherited to the topic but I would be curious how this is solved when it comes to the system topics?
We ran currently the pulsar LTS version 3.0.8.1.Lari Hotari
01/30/2025, 8:45 AMseek
by timestamp. Please check the updated release notes for the workaround. Details also in this comment: https://github.com/apache/pulsar/issues/23910#issuecomment-2623862571Conor
01/31/2025, 10:42 AMAmy Krishnamohan
01/31/2025, 9:04 PMSoumya Ghosh
02/03/2025, 10:19 AMhulusi
02/03/2025, 12:15 PMSuvrajit Manna
02/03/2025, 2:31 PMZeroQueueConsumerImpl
like behaviour in case of Partitioned Topic?
I was going through this issue:
https://github.com/apache/pulsar-client-python/issues/43
Is any of the workaround mentioned, available for Pulsar Java Client ?Slackbot
02/04/2025, 3:11 PMGirish Sharma
02/05/2025, 7:35 AMShaun
02/06/2025, 12:59 AMpersistent
vs non-persistent
topics when using a subscription type of failover
? I am seeing that the broker never sends notification on active and inactive status changes to non-persistent
topics like it does for persistent
topics. Is this expected?Conor
02/06/2025, 10:04 AMIf someone can spare a moment, could you take a look at https://github.com/apache/pulsar/issues/23890?I haven't found a solution for this, so I am going to go ahead and delete the ledgers mentioned in the error messages from the Bookies manually. If anyone knows a more correct solution please let me know
Thomas MacKenzie
02/06/2025, 5:39 PMpulsar-admin
client?
I've been looking in the configuration in the helm chart and directly in the broker and bookkeeper instances confs but did not find anything.
Thanks
https://pulsar.apache.org/docs/4.0.x/administration-isolation-bookie/Sreenath Sasidharan
02/06/2025, 8:43 PMserviceAccountName
) allowing access to our s3 buckets to pulsar functions worker in Kubernetes. Based on what I read from the documentation: https://pulsar.apache.org/docs/4.0.x/functions-runtime-kubernetes/ and source code, I dont see a way to do it for python via customRuntimeOptions
. Is that correct or have I missed something?Soumya Ghosh
02/11/2025, 9:33 AMbin/pulsar initialize-cluster-metadata --cluster <cluster_name> --metadata-store zk:zk_1,zk_2,zk_3:2181 --configuration-metadata-store zk:zk_1,zk_2,zk_3:2181 --web-service-url http://<broker_r53>:8080 --broker-service-url pulsar://<broker_r53>:6650
Output :
Killed
Are there any flags to get verbose error?