https://pulsar.apache.org/ logo
Join Slack
Powered by
# dev-go
  • n

    Nick Pocock

    06/06/2023, 3:22 PM
    The consumer is created for you here - https://github.com/apache/pulsar/blob/f4386c868b3100487ee075c75f4cb78ff9c1d971/pulsar-function-go/pf/instance.go#L303 And I can't see a way of passing down custom retry config when creating the consumer
  • n

    Nick Pocock

    06/06/2023, 3:22 PM
    Is there a strategy for passing config down within the Pulsar Go SDK? If so I could create a PR 🙌
  • a

    Andy Walker

    06/06/2023, 5:11 PM
    I have one in progress. Also, big oof at this consumer init code. There is no need for four separate clauses here.
    ✅ 1
  • s

    Slackbot

    06/16/2023, 8:53 AM
    This message was deleted.
    🙌 1
    a
    n
    • 3
    • 12
  • s

    Slackbot

    07/20/2023, 7:43 PM
    This message was deleted.
    s
    a
    • 3
    • 2
  • s

    Slackbot

    09/05/2023, 8:46 AM
    This message was deleted.
    y
    • 2
    • 1
  • s

    Slackbot

    12/27/2023, 10:16 AM
    This message was deleted.
    y
    a
    • 3
    • 4
  • s

    Slackbot

    01/05/2024, 9:28 PM
    This message was deleted.
    a
    y
    • 3
    • 2
  • a

    Anirudh Jain

    02/01/2024, 12:43 PM
    Hi, I wan't to produce and consume Avro record in Go. Using Library:
    <http://github.com/linkedin/goavro/v2|github.com/linkedin/goavro/v2>
    Schema:
    Copy code
    {
      "fields": [
        {
          "name": "person_name",
          "type": [
            "null",
            "string"
          ]
        },
        {
          "name": "age",
          "type": [
            "null",
            "int"
          ]
        },
        {
          "name": "is_alive",
          "type": [
            "null",
            "boolean"
          ]
        }
      ],
      "name": "test_event",
      "type": "record"
    }
    Record 1:
    {"person_name": "anirudh", "age": 26, "is_alive": true}
    Record 2:
    {"person_name":{"string":"anirudh"},"age":{"double":26},"is_alive":{"boolean":true}}
    I want to produce message as Record 1 but wherever type is union of data-types, it expects to produce message as Record 2. It works fine with Record 2 but gives the following error when Record 1 is passed (Screenshot Attached):
    Copy code
    {"person_name": "anirudh", "age": 26, "is_alive": true}
    ERRO[0004] convert native Go form to binary Avro data error:cannot decode textual record "test_event": cannot decode textual union: expected: '{'; actual: '2' for key: "age" 
    ERRO[0004] convert native Go form to binary Avro data error:cannot decode textual record "test_event": expected: '{'; actual: 'n' 
    ERRO[0004] Schema encode message failed                  error="cannot decode textual record \"test_event\": expected: '{'; actual: 'n'" producerID=2 producer_name=non_prod_pulsar-161-55 topic="<persistent://d11/json/event1-partition-3>"
    Error producing message: cannot decode textual record "test_event": expected: '{'; actual: 'n': SchemaFailure
    Is there a way in Golang where I can produce message as Record 1?
  • f

    Frederick Dark

    02/06/2024, 9:20 AM
    Hi, I'm developing a solution where subscribers need to detect when communication with the broker has been lost. However, I couldn't find a way to properly accomplish this, using the Golang API. Here is a simple example of the situation:
    Copy code
    client, err := pulsar.NewClient(pulsar.ClientOptions{
            URL: "<pulsar://localhost:6650>",
        })
        if err != nil {
            fmt.Println("Error creating the client:", err)
            return
        }
        defer client.Close()
    
        consumer, err := client.Subscribe(pulsar.ConsumerOptions{
            Topic:            "<persistent://public/default/my-topic>",
            SubscriptionName: "my-sub",
        })
        if err != nil {
            fmt.Println("Error creating the consumer:", err)
            return
        }
        defer consumer.Close()
    
        for {
            // From this point on, no matter what happens with the client's connection,
            // my consumer doesn't receive an error about it and keeps waiting for new messages
            msg, err := consumer.Receive(context.Background())
            if err != nil {
                fmt.Println("Error receiving the message:", err)
                break
            }
            fmt.Printf("Message received: %s\n", string(msg.Payload()))
            consumer.Ack(msg)
        }
    Even if I bring the broker down, my consumer keeps waiting without notice. I can make it detect a timeout, via context, but it's not what I want. I need to know that the connection has been lost. Am I missing something?
  • r

    Rami Youssef

    02/11/2024, 7:34 PM
    @Rami Youssef has left the channel
  • b

    Bodhili

    03/24/2024, 7:14 AM
    @Bodhili has left the channel
  • f

    Frank Kelly

    05/23/2024, 10:07 PM
    Hello - I see in the golang consumer there is a
    DLQ
    Policy. I would like to set it up simply to do retries (and discard the message - no Dead Letter Topic). This is possible in Java as follows
    Copy code
    try (final Consumer consumer = client.newConsumer()
                        .topic(createTopicUri(commonTenant, topic))
                        .subscriptionType(isLoadBalanced ? SubscriptionType.Shared : SubscriptionType.Exclusive)
                        .deadLetterPolicy(isLoadBalanced ? DeadLetterPolicy.builder().maxRedeliverCount(maxRedeliveryCount).build() : null).  <====== here
                        .subscriptionName(consumerName)
                        .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                        .subscribe()) {
    When I tried (I think) the equivalent in Golang as follows
    Copy code
    consumerOpts := pulsar.ConsumerOptions{
    		Topic:                       pr.config.PulsarConsumerTopic,
    		SubscriptionName:            pr.identifier,
    		Type:                        pulsar.Shared,
    		SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
    		BackoffPolicy:               &PulsarBackoff{},
    		DLQ: &pulsar.DLQPolicy{
    			MaxDeliveries: pr.config.PulsarMsgRedeliveryAttempts,
    		},
    	}
    I get
    Copy code
    DLQPolicy.Topic needs to be set to a valid topic name: InvalidConfiguration
    What am I missing?
  • f

    Frank Kelly

    05/24/2024, 1:25 PM
    Also does the Golang consumer have a default or overridable Ack Timeout https://github.com/apache/pulsar/blob/2db2e76d10c8b50e73ee11a0e081579c8738d64b/pul[…]g/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java and if not what happens if a message is not acknowledged by a Golang Consumer
  • f

    Frank Kelly

    05/24/2024, 1:29 PM
    Actually I think I found my answer to the 2nd question https://github.com/apache/pulsar-client-go/issues/403 but the final question in there is a good one
  • g

    g41797

    05/27/2024, 11:42 AM
    ================================= [Question]Zero value of ReceiverQueueSize ================================= | For php client was discussed #26 According to implementation of go client
    Copy code
    if options.ReceiverQueueSize <= 0 {
    	options.ReceiverQueueSize = 1000
    }
    According to recommendations: "...set the consumers' receiver queue size very low (potentially even to *_*0*_* if necessary)..." So according to docs, I need set this value to 0, but go client sets it to 1000. Please clarify | ====================================== | Slack Conversation
  • k

    Kenny Chen

    08/31/2024, 5:02 AM
    hello, I found a issue: In the current implementation of
    messageID.Serialize(),
    the ackTracker information is not recorded. This causes an issue when the client uses
    DeserializeMessageID
    to restore the message ID. The Pulsar consumer will mistakenly acknowledge the message within the batch, causing other unacknowledged messages in the batch to be acknowledged as well. This results in message loss for the client. So, I created this PR Please help me to review, thx!
    👀 1
    z
    • 2
    • 2
  • f

    Frank Kelly

    09/30/2024, 1:51 PM
    Congrats on the recent release of
    0.14.0
    - I am having a problem with the new Backoff Policy change however
  • f

    Frank Kelly

    09/30/2024, 1:51 PM
    Here's the basics of my change - moving from passing a
    struct
    to passing a function that returns a struct?
  • f

    Frank Kelly

    09/30/2024, 1:52 PM
    image.png
  • f

    Frank Kelly

    09/30/2024, 1:52 PM
    But my build is indicating
    Copy code
    internal/pulsarhelpers/pulsar_helper.go:224:28: cannot use GetBackoffPolicy() (value of type backoff.Policy) as func() backoff.Policy value in struct literal
    internal/pulsarhelpers/pulsar_helper.go:245:38: cannot use GetBackoffPolicy() (value of type backoff.Policy) as func() backoff.Policy value in struct literal
    internal/pulsarhelpers/pulsar_helper.go:259:28: cannot use GetBackoffPolicy() (value of type backoff.Policy) as func() backoff.Policy value in struct literal
    I'm still pretty new to Golang so can someone advise on the change?
  • f

    Frank Kelly

    09/30/2024, 1:53 PM
    (I tried following along with the PR here https://github.com/apache/pulsar-client-go/pull/1197/files but not fully getting it yet)
  • f

    Frank Kelly

    09/30/2024, 2:02 PM
    nvm - I think I solved it - this was the key change
  • r

    rohith gujja

    01/08/2025, 5:55 PM
    Hi Team, Am trying create a pulsar function in Go. I would like to use Manual delivery semantics (ProcessingGuarantees_MANUAL) as for my function, i would like to have the control for ack() and nack(). as mentioned in the docs, if i want to do a ack(), i need to do context.getCurrentRecord().ack(). But in Go sdk, fctx.GetCurrentRecord() returns a pulsar.Message not a pulsar.ConsumerMessage. And i don't have the input consumer info anywhere from fctx. So i cannot do a ack() manually inside my Go function my requirement is to manually ack() myself. please help!!
  • r

    rohith gujja

    01/08/2025, 6:03 PM
    @Baodi Shi
  • r

    rohith gujja

    01/08/2025, 6:22 PM
    thread from #C078TGY9R29: https://apache-pulsar.slack.com/archives/C078TGY9R29/p1736359927947829
  • r

    rohith gujja

    01/09/2025, 9:54 AM
    guys, any idea about the above query? i can raise a bug, if required..
  • y

    Yunze Xu

    03/06/2025, 2:30 PM
    https://lists.apache.org/thread/knxsq637pyhltthn77859mpxb5wcxglj Hello everyone. Recently there might be a breaking change on the Go Client. It changes the original behavior of the Go client but after that the behavior will be the same with Java client. Feel free to leave your comment to see if it's acceptable by community.
    t
    • 2
    • 2
  • t

    Thomas MacKenzie

    04/04/2025, 6:02 PM
    Hi, I see there is the option for replication cluster info on the client side.
    Copy code
    // ReplicationClusters override the replication clusters for this message.
    ReplicationClusters []string
    And I was curious if the is any plan for implementing the cluster-level failover feature in the Go client (https://pulsar.apache.org/client-feature-matrix/#client)? Thank you
  • t

    Thomas MacKenzie

    07/12/2025, 4:55 AM
    Hi 👋 I have one PR up https://github.com/apache/pulsar-client-go/pull/1390, adding the properties management at the namespace level, we have a usecase for it in our platform It's my first time contributing, feel free to give me feedback here if you need to. Thank you
    👀 1