Slackbot
03/14/2023, 3:47 PM高晟京
03/15/2023, 2:45 AM高晟京
03/15/2023, 2:57 AM高晟京
03/15/2023, 3:01 AMSlackbot
03/15/2023, 6:04 AM高晟京
03/15/2023, 6:21 AMSlackbot
03/15/2023, 6:21 AMSlackbot
03/15/2023, 6:27 AM高晟京
03/15/2023, 11:33 AMSlackbot
03/15/2023, 11:35 AMSlackbot
03/16/2023, 1:30 AMSlackbot
03/16/2023, 12:43 PMSlackbot
03/16/2023, 1:54 PM高晟京
03/17/2023, 2:35 AMSlackbot
03/17/2023, 2:38 AM高晟京
03/17/2023, 2:50 AM高晟京
03/17/2023, 3:12 AMSlackbot
03/20/2023, 7:42 AMSlackbot
03/20/2023, 12:05 PMSlackbot
03/20/2023, 12:28 PMSlackbot
03/20/2023, 3:51 PMjihong
03/21/2023, 2:00 AMjihong
03/21/2023, 3:02 AMjihong
03/21/2023, 3:02 AMjihong
03/21/2023, 3:08 AMYang Ou
03/21/2023, 4:52 AMYang Ou
03/21/2023, 4:52 AMconst lookupResultMaxRedirect = 20
func (ls *lookupService) Lookup(topic string) (*LookupResult, error) {
ls.metrics.LookupRequestsCount.Inc()
id := ls.rpcClient.NewRequestID()
res, err := ls.rpcClient.RequestToAnyBroker(id, pb.BaseCommand_LOOKUP, &pb.CommandLookupTopic{
RequestId: &id,
Topic: &topic,
Authoritative: proto.Bool(false),
AdvertisedListenerName: proto.String(ls.listenerName),
})
if err != nil {
return nil, err
}
ls.log.Debugf("Got topic{%s} lookup response: %+v", topic, res)
for i := 0; i < lookupResultMaxRedirect; i++ {
lr := res.Response.LookupTopicResponse
switch *lr.Response {
case pb.CommandLookupTopicResponse_Redirect:
logicalAddress, physicalAddr, err := ls.getBrokerAddress(lr)
if err != nil {
return nil, err
}
ls.log.Debugf("Follow topic{%s} redirect to broker. %v / %v - Use proxy: %v",
topic, lr.BrokerServiceUrl, lr.BrokerServiceUrlTls, lr.ProxyThroughServiceUrl)
id := ls.rpcClient.NewRequestID()
res, err = ls.rpcClient.Request(logicalAddress, physicalAddr, id, pb.BaseCommand_LOOKUP, &pb.CommandLookupTopic{
RequestId: &id,
Topic: &topic,
Authoritative: lr.Authoritative,
AdvertisedListenerName: proto.String(ls.listenerName),
})
if err != nil {
return nil, err
}
// Process the response at the top of the loop
continue
case pb.CommandLookupTopicResponse_Connect:
ls.log.Debugf("Successfully looked up topic{%s} on broker. %s / %s - Use proxy: %t",
topic, lr.GetBrokerServiceUrl(), lr.GetBrokerServiceUrlTls(), lr.GetProxyThroughServiceUrl())
logicalAddress, physicalAddress, err := ls.getBrokerAddress(lr)
if err != nil {
return nil, err
}
return &LookupResult{
LogicalAddr: logicalAddress,
PhysicalAddr: physicalAddress,
}, nil
case pb.CommandLookupTopicResponse_Failed:
ls.log.WithFields(log.Fields{
"topic": topic,
"error": lr.GetError(),
"message": lr.GetMessage(),
}).Warn("Failed to lookup topic")
return nil, errors.New(lr.GetError().String())
}
}
return nil, errors.New("exceeded max number of redirection during topic lookup")
}
So it seems it’s a known issueYang Ou
03/21/2023, 4:53 AMYang Ou
03/21/2023, 4:53 AMapachepulsar/pulsar:latest
Yang Ou
03/21/2023, 5:10 AM