Andy Walker
05/31/2023, 4:39 PMunc (gi *goInstance) setupClient() error {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: gi.context.instanceConf.pulsarServiceURL,
})
if err != nil {
log.Errorf("create client error:%v", err)
gi.stats.incrTotalSysExceptions(err)
return err
}
gi.client = client
return nil
}
I also couldn't find any reference to Auth anywhere in the SKD outside of the proto, so... unless I miss my guess, does this mean that pulsar functions for Go do not support any auth at all?Nick Pocock
06/06/2023, 12:54 PMfatal
call and the panic
?
func Start(funcName interface{}) {
function := newFunction(funcName)
goInstance := newGoInstance()
err := goInstance.startFunction(function)
if err != nil {
log.Fatal(err)
panic("start function failed, please check.")
}
}
I think the fatal call stops you from recovering from the panic if you wanted to with something like
defer func() {
if r := recover(); r != nil {
log.Warnf("recovered from error %+v", r)
}
}()
Andy Walker
06/06/2023, 12:57 PMAndy Walker
06/06/2023, 1:00 PMStart
, which is blocking, similar to http.ListenAndServe
Nick Pocock
06/06/2023, 1:19 PMlog.Fatal
before the panic calls os.Exit
so if theres an error within your Pulsars Start
method during message processing I don't think we can recover from itNick Pocock
06/06/2023, 1:19 PMAndy Walker
06/06/2023, 1:20 PMAndy Walker
06/06/2023, 1:21 PMAndy Walker
06/06/2023, 1:21 PMstartFunction
blocks, so you could conceivably capture panics by running a defer/recover
inside your function.Andy Walker
06/06/2023, 1:22 PMnil
from your function.Andy Walker
06/06/2023, 1:24 PMStart
represents either an unhandled error on your part in the form of an explicit error return from the pulsar function closure, which means you want to stop processing and exit.Andy Walker
06/06/2023, 1:30 PMhttp.ListenAndServe
. Yes, it returns an error, but most of the canonical examples block with a call to Fatal, under the assumption that if you cannot start the web server (or if it somehow fails), this is not something you would really choose to recover from.Andy Walker
06/06/2023, 1:34 PMStart()
return an optional error, this would open up another path for you to block the function process, which would force Pulsar to detect the dead function and restart it. I'm not sure how robust the health checks for functions are, but conceivably if you failed to exit after a failed call to Start()
, the goroutines running the metrics and health check servers would keep running and Pulsar would have no idea that your function needed to restart.Andy Walker
06/06/2023, 1:35 PMlog.Fatal()
is partly a failsafe and partly a desire to avoid the need to orchestrate the concurrency around the health and metrics servers.Andy Walker
06/06/2023, 1:52 PMnil
, in which case the message will be `ack`ed, For the second, you just return an error, the function process dies, and the message will be redelivered if you're running with the appropriate guarantees. You can't ask for it to be redelivered at a specific time, of course, but it's still a relatively foolproof way of getting the message redelivered. My experience with queue-based distributed systems has taught me that the more handling steps you try and add before simply dying and letting the orchestration environment restart you, the more chance you have to enter a frozen state.Nick Pocock
06/06/2023, 3:09 PMStart
method 👌 similar to how Lambdas work.Nick Pocock
06/06/2023, 3:10 PMNick Pocock
06/06/2023, 3:10 PMNick Pocock
06/06/2023, 3:22 PMNick Pocock
06/06/2023, 3:22 PMAndy Walker
06/06/2023, 5:11 PMNick Pocock
06/16/2023, 8:53 AMThomas O'Neill
07/20/2023, 7:43 PMDeepak Sah
09/05/2023, 8:46 AMAnirudh Jain
12/27/2023, 10:16 AMSubscriptionInitPos
is set to 0
(Latest).
But, I’m receiving all the records present in the topic.
Can anyone let me know, how to set offset
in CosumerOptions
.
Code snippet:
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: fullTopic,
SubscriptionName: "user_cli_1",
SubscriptionInitPos: 0,
Type: pulsar.Shared,
})
Tried using this as well:
• SubscriptionInitPos: pulsar.InitialPosition(0)
• SubscriptionInitPos: pulsar.Latest
Anirudh Jain
01/05/2024, 9:28 PM// Create a Pulsar client
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: pulsarURL,
Logger: a,
Authentication: pulsar.NewAuthenticationToken(jsonData["requesterPulsarToken"].(string)),
})
if err != nil {
fmt.Println("Error creating Pulsar client:", err)
return
}
defer client.Close()
// Create a producer for the specified topic
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: fullTopic,
})
if err != nil {
fmt.Println("Error creating Pulsar producer:", err)
return
}
defer producer.Close()
Anirudh Jain
02/01/2024, 12:43 PM<http://github.com/linkedin/goavro/v2|github.com/linkedin/goavro/v2>
Schema:
{
"fields": [
{
"name": "person_name",
"type": [
"null",
"string"
]
},
{
"name": "age",
"type": [
"null",
"int"
]
},
{
"name": "is_alive",
"type": [
"null",
"boolean"
]
}
],
"name": "test_event",
"type": "record"
}
Record 1: {"person_name": "anirudh", "age": 26, "is_alive": true}
Record 2: {"person_name":{"string":"anirudh"},"age":{"double":26},"is_alive":{"boolean":true}}
I want to produce message as Record 1 but wherever type is union of data-types, it expects to produce message as Record 2.
It works fine with Record 2 but gives the following error when Record 1 is passed (Screenshot Attached):
{"person_name": "anirudh", "age": 26, "is_alive": true}
ERRO[0004] convert native Go form to binary Avro data error:cannot decode textual record "test_event": cannot decode textual union: expected: '{'; actual: '2' for key: "age"
ERRO[0004] convert native Go form to binary Avro data error:cannot decode textual record "test_event": expected: '{'; actual: 'n'
ERRO[0004] Schema encode message failed error="cannot decode textual record \"test_event\": expected: '{'; actual: 'n'" producerID=2 producer_name=non_prod_pulsar-161-55 topic="<persistent://d11/json/event1-partition-3>"
Error producing message: cannot decode textual record "test_event": expected: '{'; actual: 'n': SchemaFailure
Is there a way in Golang where I can produce message as Record 1?Frederick Dark
02/06/2024, 9:20 AMclient, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "<pulsar://localhost:6650>",
})
if err != nil {
fmt.Println("Error creating the client:", err)
return
}
defer client.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "<persistent://public/default/my-topic>",
SubscriptionName: "my-sub",
})
if err != nil {
fmt.Println("Error creating the consumer:", err)
return
}
defer consumer.Close()
for {
// From this point on, no matter what happens with the client's connection,
// my consumer doesn't receive an error about it and keeps waiting for new messages
msg, err := consumer.Receive(context.Background())
if err != nil {
fmt.Println("Error receiving the message:", err)
break
}
fmt.Printf("Message received: %s\n", string(msg.Payload()))
consumer.Ack(msg)
}
Even if I bring the broker down, my consumer keeps waiting without notice.
I can make it detect a timeout, via context, but it's not what I want. I need to know that the connection has been lost.
Am I missing something?Rami Youssef
02/11/2024, 7:34 PMBodhili
03/24/2024, 7:14 AM