sijieg
08/18/2025, 8:48 PMPULSAR50
for 50% off registration.
👉 Register: https://www.eventbrite.com/e/data-streaming-summit-san-francisco-2025-tickets-1432401484399?aff=oddtdtcreator
📅 Full schedule: https://datastreaming-summit.org/event/data-streaming-sf-2025/schedule
What to expect
• Sep 29 — Training & Workshop Day: Hands-on data streaming training + advanced Streaming Lakehouse workshop (with AWS).
• Sep 30 — Main Summit: Inspiring keynotes + 4 tracks: Deep Dives, Use Cases, AI + Stream Processing, Streaming Lakehouse.
• Talks from top companies and community sessions featuring Pulsar, Flink, Iceberg and other data streaming technologies.
Would love to see you at the Summit! 🎉samriddhi
08/19/2025, 9:19 PMAUTO_CONSUME
- works great for reading any schema
• ❌ Output: Need exact schema match, but AUTO_PRODUCE
doesn't exist
The challenge: To avoid static schemas, I need to get schema info from Pulsar Admin at runtime, but Pulsar Functions don't have access to PulsarAdmin
(getting errors when trying).
Questions:
1. What's the recommended pattern for schema-agnostic functions?
2. How do I discover output topic schema at runtime without Admin access?
3. Any alternatives to runtime schema discovery for generic functions?
Goal: One function that works with multiple topic pairs having different schemas (Avro→Avro, JSON→JSON, Avro→JSON, etc.) without recompiling.
Anyone solved this or know the best practices?samriddhi
08/19/2025, 9:19 PMDan Rossi
08/20/2025, 7:44 PMSamuel
08/21/2025, 11:22 AMThomas MacKenzie
08/26/2025, 10:26 PMconf/broker.conf
by any chance?
I've been applying various changes to the brokers configuration with no issues, but I'm trying to set managedLedgerForceRecovery
and it does not seems to work. It's not present or applied (when the container starts, the application logs each config field being applied).
https://github.com/apache/pulsar/blob/master/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java#L2355-L2360
Looking at the config with cat conf/broker.conf | grep managedLedgerForceRecovery
, I have no results either. I understand it's a dynamic configuration field, but others are and I can see them in that file set with the right value so I'm wondering if I'm missing something?
Pulsar 4.0.6
Thanks for your helpJack LaPlante
08/27/2025, 6:34 PMThomas MacKenzie
08/28/2025, 3:24 AMserver error: PersistenceError: org.apache.bookkeeper.mledger.ManagedLedgerException: Error while recovering ledger error code: -10
This error was preventing the applications to publish messages, and also created producers. For 2h I could see the ledger count a bit off at 0 (not sure what happened, but bookies were up during that time I believe)
Some context: We believe the bookies were restarted (we use AWS spot instances in this env, so maybe a ungraceful shutdown).
I have 2 main questions:
• What would be the best course of action when this happens? (curious in manual intervention although not reactive with a system running 24/7)
• I know there are 2 brokers fields available fields managedLedgerForceRecovery
and autoSkipNonRecoverableData
◦ Could one of them help? (do they serve the same purpose). It seems like autoSkipNonRecoverableData
be avoided is part of the legacy codebase
◦ Are they both destructive (data loss permanently)? I opened a PR to add managedLedgerForceRecovery
to the broker conf, thanks for the info about the risks it involves Lari
◦ Is one better than the other?
Thank you for your helpGaurav Ashok
09/04/2025, 11:12 AMCong Zhao
09/08/2025, 11:56 AMJiji K
09/09/2025, 6:41 AMArtur Jablonski
09/16/2025, 1:43 PMAmanda
09/16/2025, 3:12 PMAbhilash Mandaliya
09/17/2025, 11:30 AMFabri
09/18/2025, 4:22 AMVaibhav Swarnkar
09/19/2025, 10:10 AMAbhilash Mandaliya
09/19/2025, 11:33 AMEFFECTIVELY_ONCE
guarantee and the sink calls the record.fail()
?KP
09/19/2025, 6:53 PMbenjamin99
09/20/2025, 9:14 AM{
"level": "warn",
"time": "2025-09-20T07:01:17.961738475Z",
"component": "public-rpc-server",
"error": {
"error": "oxia: failed to append to wal: 535046 can not immediately follow 459362: oxia: invalid next offset in wal",
"kind": "*errors.withStack",
"stack": null
},
"namespace": "bookkeeper",
"peer": "10.194.131.14:52392",
"shard": 6,
"message": "Write stream has been closed by error"
}
I did search the related topic in the Oxia github page, but found no issues/discussions. Does anyone have had facing the similar issues before, or have any clue about how to resolve it?Gergely Fábián
09/20/2025, 4:23 PMbhasvij
09/23/2025, 4:37 PMLari Hotari
09/27/2025, 1:09 PMArtur Jablonski
09/30/2025, 6:24 AMck_xnet
10/01/2025, 1:01 PMLari Hotari
10/02/2025, 4:05 AMFabri
10/03/2025, 8:38 PMPraveen Gopalan
10/06/2025, 6:37 AMzaryab
10/09/2025, 1:02 PMNicolas Belliard
10/15/2025, 2:18 PMdelayedDeliveryTrackerFactoryClassName
.
We initially used InMemoryDelayedDeliveryTracker
, (because we where using version 2.7 of pulsar) which caused acknowledged delayed messages to be reprocessed after a broker restart likely due to its state stored only in memory. Given our high message volume (millions), this behavior is problematic. A screenshot is available showing the lag escalation following a broker restart. We're generating delayed messages out of sequence, resulting in gaps within the acknowledged message stream. This causes non-contiguous ranges of messages to be marked as deleted or eligible for deletion. In our screenshot, the value of nonContiguousDeletedMessagesRanges
is 16833.
To mitigate this following the upgrade of pulsar to the version 4.0.4, we updated the broker config to use org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory
, which should persist delayed delivery metadata to disk via BookKeeper ledger buckets.
However, after switching to the bucket-based tracker, we're still seeing the same behavior post-restart. A few observations and questions:
• I checked the pulsar_delayed_message_index_loaded
metric and noticed that messages are still being loaded into memory, while pulsar_delayed_message_index_bucket_total
remains at zero. Is this expected? Shouldn’t the bucket tracker be persisting and loading from disk?
• Are there additional broker settings required to fully enable bucket-based delayed delivery tracking? For example:
◦ Do we need to explicitly configure delayedDeliveryTrackerBucketSize
or delayedDeliveryMaxNumBuckets
?
◦ Is there any dependency on topic-level settings or namespace policies that could override the broker-level tracker configuration?
◦ Could other settings interfere with delayed message persistence?
Any insights or guidance would be greatly appreciated. Thanks for your help!benjamin99
10/17/2025, 3:01 AM