Nick Pocock
06/06/2023, 3:22 PMNick Pocock
06/06/2023, 3:22 PMAndy Walker
06/06/2023, 5:11 PMSlackbot
06/16/2023, 8:53 AMSlackbot
07/20/2023, 7:43 PMSlackbot
09/05/2023, 8:46 AMSlackbot
12/27/2023, 10:16 AMSlackbot
01/05/2024, 9:28 PMAnirudh Jain
02/01/2024, 12:43 PM<http://github.com/linkedin/goavro/v2|github.com/linkedin/goavro/v2>
Schema:
{
"fields": [
{
"name": "person_name",
"type": [
"null",
"string"
]
},
{
"name": "age",
"type": [
"null",
"int"
]
},
{
"name": "is_alive",
"type": [
"null",
"boolean"
]
}
],
"name": "test_event",
"type": "record"
}
Record 1: {"person_name": "anirudh", "age": 26, "is_alive": true}
Record 2: {"person_name":{"string":"anirudh"},"age":{"double":26},"is_alive":{"boolean":true}}
I want to produce message as Record 1 but wherever type is union of data-types, it expects to produce message as Record 2.
It works fine with Record 2 but gives the following error when Record 1 is passed (Screenshot Attached):
{"person_name": "anirudh", "age": 26, "is_alive": true}
ERRO[0004] convert native Go form to binary Avro data error:cannot decode textual record "test_event": cannot decode textual union: expected: '{'; actual: '2' for key: "age"
ERRO[0004] convert native Go form to binary Avro data error:cannot decode textual record "test_event": expected: '{'; actual: 'n'
ERRO[0004] Schema encode message failed error="cannot decode textual record \"test_event\": expected: '{'; actual: 'n'" producerID=2 producer_name=non_prod_pulsar-161-55 topic="<persistent://d11/json/event1-partition-3>"
Error producing message: cannot decode textual record "test_event": expected: '{'; actual: 'n': SchemaFailure
Is there a way in Golang where I can produce message as Record 1?Frederick Dark
02/06/2024, 9:20 AMclient, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "<pulsar://localhost:6650>",
})
if err != nil {
fmt.Println("Error creating the client:", err)
return
}
defer client.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "<persistent://public/default/my-topic>",
SubscriptionName: "my-sub",
})
if err != nil {
fmt.Println("Error creating the consumer:", err)
return
}
defer consumer.Close()
for {
// From this point on, no matter what happens with the client's connection,
// my consumer doesn't receive an error about it and keeps waiting for new messages
msg, err := consumer.Receive(context.Background())
if err != nil {
fmt.Println("Error receiving the message:", err)
break
}
fmt.Printf("Message received: %s\n", string(msg.Payload()))
consumer.Ack(msg)
}
Even if I bring the broker down, my consumer keeps waiting without notice.
I can make it detect a timeout, via context, but it's not what I want. I need to know that the connection has been lost.
Am I missing something?Rami Youssef
02/11/2024, 7:34 PMBodhili
03/24/2024, 7:14 AMFrank Kelly
05/23/2024, 10:07 PMDLQ
Policy. I would like to set it up simply to do retries (and discard the message - no Dead Letter Topic).
This is possible in Java as follows
try (final Consumer consumer = client.newConsumer()
.topic(createTopicUri(commonTenant, topic))
.subscriptionType(isLoadBalanced ? SubscriptionType.Shared : SubscriptionType.Exclusive)
.deadLetterPolicy(isLoadBalanced ? DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build() : null). <====== here
.subscriptionName(consumerName)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe()) {
When I tried (I think) the equivalent in Golang as follows
consumerOpts := pulsar.ConsumerOptions{
Topic: pr.config.PulsarConsumerTopic,
SubscriptionName: pr.identifier,
Type: pulsar.Shared,
SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
BackoffPolicy: &PulsarBackoff{},
DLQ: &pulsar.DLQPolicy{
MaxDeliveries: pr.config.PulsarMsgRedeliveryAttempts,
},
}
I get
DLQPolicy.Topic needs to be set to a valid topic name: InvalidConfiguration
What am I missing?Frank Kelly
05/24/2024, 1:25 PMFrank Kelly
05/24/2024, 1:29 PMg41797
05/27/2024, 11:42 AMif options.ReceiverQueueSize <= 0 {
options.ReceiverQueueSize = 1000
}
According to recommendations:
"...set the consumers' receiver queue size very low (potentially even to *_*0*_* if necessary)..."
So according to docs, I need set this value to 0, but go client sets it to 1000.
Please clarify
|
======================================
|
Slack ConversationKenny Chen
08/31/2024, 5:02 AMmessageID.Serialize(),
the ackTracker information is not recorded. This causes an issue when the client uses DeserializeMessageID
to restore the message ID. The Pulsar consumer will mistakenly acknowledge the message within the batch, causing other unacknowledged messages in the batch to be acknowledged as well. This results in message loss for the client.
So, I created this PR
Please help me to review, thx!Frank Kelly
09/30/2024, 1:51 PM0.14.0
- I am having a problem with the new Backoff Policy change howeverFrank Kelly
09/30/2024, 1:51 PMstruct
to passing a function that returns a struct?Frank Kelly
09/30/2024, 1:52 PMFrank Kelly
09/30/2024, 1:52 PMinternal/pulsarhelpers/pulsar_helper.go:224:28: cannot use GetBackoffPolicy() (value of type backoff.Policy) as func() backoff.Policy value in struct literal
internal/pulsarhelpers/pulsar_helper.go:245:38: cannot use GetBackoffPolicy() (value of type backoff.Policy) as func() backoff.Policy value in struct literal
internal/pulsarhelpers/pulsar_helper.go:259:28: cannot use GetBackoffPolicy() (value of type backoff.Policy) as func() backoff.Policy value in struct literal
I'm still pretty new to Golang so can someone advise on the change?Frank Kelly
09/30/2024, 1:53 PMFrank Kelly
09/30/2024, 2:02 PMrohith gujja
01/08/2025, 5:55 PMrohith gujja
01/08/2025, 6:03 PMrohith gujja
01/08/2025, 6:22 PMrohith gujja
01/09/2025, 9:54 AMYunze Xu
03/06/2025, 2:30 PMThomas MacKenzie
04/04/2025, 6:02 PM// ReplicationClusters override the replication clusters for this message.
ReplicationClusters []string
And I was curious if the is any plan for implementing the cluster-level failover feature in the Go client (https://pulsar.apache.org/client-feature-matrix/#client)?
Thank youThomas MacKenzie
07/12/2025, 4:55 AM