回顾
上一讲中,我们知道了broker是做什么的,以及go-micro默认的实现方案,这个http broker作为默认的实现方案,供我们学习使用,一般情况下在生产环境中,大家会根据业务诉求,以及消息是否允许丢失,都会自定义一个实现方案,根据不同的业务需求,选择比较稳定的开源方案,比如Kafka,Nsq,RabbitMq等,甚至Redis中的发布订阅管道也可以作为一个实现方案来选择,就看大家的需要了。
上一讲的最后,我们也提到了,go-micro可插拔的实现一个broker,原理就是定义了一组接口方法,只要你的实现方案中,实现了这组方法,就可以在go-micro中去使用你自定义的broker.
Nsq
我再另外一篇文章中讲解了Nsq,想要了解的可以去翻翻看。
我们简单回顾一下,这个工具是一个分布式的实时消息平台。具有默认情况下,消息不持久,至少被传递一次,消息是无序的等特点。
如何部署一个分布式的集群也很简单,大家可以使用docker来部署一个Nsq集群,然后我们就可以使用Nsq在go-micro的架构下实现我们的异步消息系统了
Broker接口
我们再来回顾一下,Broker接口的组成
// Broker is an interface used for asynchronous messaging.
type Broker interface {
Init(...Option) error
Options() Options
Address() string
Connect() error
Disconnect() error
Publish(topic string, m *Message, opts ...PublishOption) error
Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
String() string
}
复制代码
主要方法就是初始化Init,连接Connect以及Disconnect方法,和发布Publish以及订阅Subscribe方法。
实现方案
定义结构体
type nsqBroker struct {
lookupdAddrs []string
addrs []string
opts broker.Options
config *nsq.Config
sync.Mutex
running bool
p []*nsq.Producer
c []*subscriber
}
type publication struct {
topic string
m *broker.Message
nm *nsq.Message
opts broker.PublishOptions
err error
}
type subscriber struct {
topic string
opts broker.SubscribeOptions
c *nsq.Consumer
// handler so we can resubcribe
h nsq.HandlerFunc
// concurrency
n int
}
复制代码
如何实例化我们的NsqBroker实例的呢
func NewBroker(opts ...broker.Option) broker.Broker {
options := broker.Options{
// Default codec
Codec: json.Marshaler{},
// Default context
Context: context.Background(),
}
for _, o := range opts {
o(&options)
}
var addrs []string
for _, addr := range options.Addrs {
if len(addr) > 0 {
addrs = append(addrs, addr)
}
}
if len(addrs) == 0 {
addrs = []string{"127.0.0.1:4150"}
}
n := &nsqBroker{
addrs: addrs,
opts: options,
config: nsq.NewConfig(),
}
n.configure(n.opts.Context)
return n
}
复制代码
可以看到在nsqBroker实例中的config是我们Nsq的连接配置
然后就是我们Broker接口中的Init方法,做初始化处理
func (n *nsqBroker) Init(opts ...broker.Option) error {
for _, o := range opts {
o(&n.opts)
}
var addrs []string
for _, addr := range n.opts.Addrs {
if len(addr) > 0 {
addrs = append(addrs, addr)
}
}
if len(addrs) == 0 {
addrs = []string{"127.0.0.1:4150"}
}
n.addrs = addrs
n.configure(n.opts.Context)
return nil
}
复制代码
初始化结束后,再来看看Connect方法
func (n *nsqBroker) Connect() error {
n.Lock()
defer n.Unlock()
if n.running {
return nil
}
producers := make([]*nsq.Producer, 0, len(n.addrs))
// create producers
for _, addr := range n.addrs {
p, err := nsq.NewProducer(addr, n.config)
if err != nil {
return err
}
if err = p.Ping(); err != nil {
return err
}
producers = append(producers, p)
}
// create consumers
for _, c := range n.c {
channel := c.opts.Queue
if len(channel) == 0 {
channel = uuid.New().String() + "#ephemeral"
}
cm, err := nsq.NewConsumer(c.topic, channel, n.config)
if err != nil {
return err
}
cm.AddConcurrentHandlers(c.h, c.n)
c.c = cm
if len(n.lookupdAddrs) > 0 {
c.c.ConnectToNSQLookupds(n.lookupdAddrs)
} else {
err = c.c.ConnectToNSQDs(n.addrs)
if err != nil {
return err
}
}
}
n.p = producers
n.running = true
return nil
}
复制代码
根据我们nsqBroker结构体中的addrs,来创建发布者列表,根据nsqBroker中的消费者列表,创建nsq的Consumer.然后根据是否配置了nsq lookupaddr来连接到相关组件,如果没配就直接连到指定地址的nsqd.
重点来了,看看如何实现消息的发布和订阅
func (n *nsqBroker) Publish(topic string, message *broker.Message, opts ...broker.PublishOption) error {
p := n.p[rand.Intn(len(n.p))]
options := broker.PublishOptions{}
for _, o := range opts {
o(&options)
}
var (
doneChan chan *nsq.ProducerTransaction
delay time.Duration
)
if options.Context != nil {
if v, ok := options.Context.Value(asyncPublishKey{}).(chan *nsq.ProducerTransaction); ok {
doneChan = v
}
if v, ok := options.Context.Value(deferredPublishKey{}).(time.Duration); ok {
delay = v
}
}
b, err := n.opts.Codec.Marshal(message)
if err != nil {
return err
}
if doneChan != nil {
if delay > 0 {
return p.DeferredPublishAsync(topic, delay, b, doneChan)
}
return p.PublishAsync(topic, b, doneChan)
} else {
if delay > 0 {
return p.DeferredPublish(topic, delay, b)
}
return p.Publish(topic, b)
}
}
复制代码
从发布在列表中取出一个发布者,然后将编码后的消息发布到指定topic中。
最后就是订阅者方法
func (n *nsqBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
options := broker.SubscribeOptions{
AutoAck: true,
}
for _, o := range opts {
o(&options)
}
concurrency, maxInFlight := DefaultConcurrentHandlers, DefaultConcurrentHandlers
if options.Context != nil {
if v, ok := options.Context.Value(concurrentHandlerKey{}).(int); ok {
maxInFlight, concurrency = v, v
}
if v, ok := options.Context.Value(maxInFlightKey{}).(int); ok {
maxInFlight = v
}
}
channel := options.Queue
if len(channel) == 0 {
channel = uuid.New().String() + "#ephemeral"
}
config := *n.config
config.MaxInFlight = maxInFlight
c, err := nsq.NewConsumer(topic, channel, &config)
if err != nil {
return nil, err
}
h := nsq.HandlerFunc(func(nm *nsq.Message) error {
if !options.AutoAck {
nm.DisableAutoResponse()
}
var m broker.Message
if err := n.opts.Codec.Unmarshal(nm.Body, &m); err != nil {
return err
}
p := &publication{topic: topic, m: &m}
p.err = handler(p)
return p.err
})
c.AddConcurrentHandlers(h, concurrency)
if len(n.lookupdAddrs) > 0 {
err = c.ConnectToNSQLookupds(n.lookupdAddrs)
} else {
err = c.ConnectToNSQDs(n.addrs)
}
if err != nil {
return nil, err
}
sub := &subscriber{
c: c,
opts: options,
topic: topic,
h: h,
n: concurrency,
}
n.c = append(n.c, sub)
return sub, nil
}
复制代码
在这个方法中,我们通过Nsq config来创建nsq消费者。然后定义一个nsq HandlerFunc来处理消费到的消息。然后就可以连接到nsq服务上进行监听消息了。最后创建的订阅者实例放到nsqBroker实例中的订阅者列表中。想要了解nsq Consumer 是如何连接Nsqd服务进程的,可以去go-nsq代码看具体的流程这个已经不属于我们今天的主题了,大家只要知道,Nsq建议我们使用lookupd来进行topic发布者的服务发现。
// ConnectToNSQDs takes multiple nsqd addresses to connect directly to.
//
// It is recommended to use ConnectToNSQLookupd so that topics are discovered
// automatically. This method is useful when you want to connect to local instance.
func (r *Consumer) ConnectToNSQDs(addresses []string) error {
for _, addr := range addresses {
err := r.ConnectToNSQD(addr)
if err != nil {
return err
}
}
return nil
}
复制代码
至此一个自定义的broker实现方案就出来了,你可以通过kafka来实现自己的broker