如何在go-micro中实现一个自定义的异步消息系统

回顾

上一讲中,我们知道了broker是做什么的,以及go-micro默认的实现方案,这个http broker作为默认的实现方案,供我们学习使用,一般情况下在生产环境中,大家会根据业务诉求,以及消息是否允许丢失,都会自定义一个实现方案,根据不同的业务需求,选择比较稳定的开源方案,比如Kafka,Nsq,RabbitMq等,甚至Redis中的发布订阅管道也可以作为一个实现方案来选择,就看大家的需要了。

上一讲的最后,我们也提到了,go-micro可插拔的实现一个broker,原理就是定义了一组接口方法,只要你的实现方案中,实现了这组方法,就可以在go-micro中去使用你自定义的broker.

Nsq

我再另外一篇文章中讲解了Nsq,想要了解的可以去翻翻看。

我们简单回顾一下,这个工具是一个分布式的实时消息平台。具有默认情况下,消息不持久,至少被传递一次,消息是无序的等特点。

如何部署一个分布式的集群也很简单,大家可以使用docker来部署一个Nsq集群,然后我们就可以使用Nsq在go-micro的架构下实现我们的异步消息系统了

Broker接口

我们再来回顾一下,Broker接口的组成

// Broker is an interface used for asynchronous messaging.
type Broker interface {
	Init(...Option) error
	Options() Options
	Address() string
	Connect() error
	Disconnect() error
	Publish(topic string, m *Message, opts ...PublishOption) error
	Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
	String() string
}
复制代码

主要方法就是初始化Init,连接Connect以及Disconnect方法,和发布Publish以及订阅Subscribe方法。

实现方案

定义结构体


type nsqBroker struct {
	lookupdAddrs []string
	addrs        []string
	opts         broker.Options
	config       *nsq.Config

	sync.Mutex
	running bool
	p       []*nsq.Producer
	c       []*subscriber
}

type publication struct {
	topic string
	m     *broker.Message
	nm    *nsq.Message
	opts  broker.PublishOptions
	err   error
}

type subscriber struct {
	topic string
	opts  broker.SubscribeOptions

	c *nsq.Consumer

	// handler so we can resubcribe
	h nsq.HandlerFunc
	// concurrency
	n int
}

复制代码

如何实例化我们的NsqBroker实例的呢


func NewBroker(opts ...broker.Option) broker.Broker {
	options := broker.Options{
		// Default codec
		Codec: json.Marshaler{},
		// Default context
		Context: context.Background(),
	}

	for _, o := range opts {
		o(&options)
	}

	var addrs []string

	for _, addr := range options.Addrs {
		if len(addr) > 0 {
			addrs = append(addrs, addr)
		}
	}

	if len(addrs) == 0 {
		addrs = []string{"127.0.0.1:4150"}
	}

	n := &nsqBroker{
		addrs:  addrs,
		opts:   options,
		config: nsq.NewConfig(),
	}
	n.configure(n.opts.Context)

	return n
}

复制代码

可以看到在nsqBroker实例中的config是我们Nsq的连接配置

然后就是我们Broker接口中的Init方法,做初始化处理


func (n *nsqBroker) Init(opts ...broker.Option) error {
	for _, o := range opts {
		o(&n.opts)
	}

	var addrs []string

	for _, addr := range n.opts.Addrs {
		if len(addr) > 0 {
			addrs = append(addrs, addr)
		}
	}

	if len(addrs) == 0 {
		addrs = []string{"127.0.0.1:4150"}
	}

	n.addrs = addrs
	n.configure(n.opts.Context)
	return nil
}
复制代码

初始化结束后,再来看看Connect方法


func (n *nsqBroker) Connect() error {
	n.Lock()
	defer n.Unlock()

	if n.running {
		return nil
	}

	producers := make([]*nsq.Producer, 0, len(n.addrs))

	// create producers
	for _, addr := range n.addrs {
		p, err := nsq.NewProducer(addr, n.config)
		if err != nil {
			return err
		}
		if err = p.Ping(); err != nil {
			return err
		}
		producers = append(producers, p)
	}

	// create consumers
	for _, c := range n.c {
		channel := c.opts.Queue
		if len(channel) == 0 {
			channel = uuid.New().String() + "#ephemeral"
		}

		cm, err := nsq.NewConsumer(c.topic, channel, n.config)
		if err != nil {
			return err
		}

		cm.AddConcurrentHandlers(c.h, c.n)

		c.c = cm

		if len(n.lookupdAddrs) > 0 {
			c.c.ConnectToNSQLookupds(n.lookupdAddrs)
		} else {
			err = c.c.ConnectToNSQDs(n.addrs)
			if err != nil {
				return err
			}
		}
	}

	n.p = producers
	n.running = true
	return nil
}
复制代码

根据我们nsqBroker结构体中的addrs,来创建发布者列表,根据nsqBroker中的消费者列表,创建nsq的Consumer.然后根据是否配置了nsq lookupaddr来连接到相关组件,如果没配就直接连到指定地址的nsqd.

重点来了,看看如何实现消息的发布和订阅


func (n *nsqBroker) Publish(topic string, message *broker.Message, opts ...broker.PublishOption) error {
	p := n.p[rand.Intn(len(n.p))]

	options := broker.PublishOptions{}
	for _, o := range opts {
		o(&options)
	}

	var (
		doneChan chan *nsq.ProducerTransaction
		delay    time.Duration
	)
	if options.Context != nil {
		if v, ok := options.Context.Value(asyncPublishKey{}).(chan *nsq.ProducerTransaction); ok {
			doneChan = v
		}
		if v, ok := options.Context.Value(deferredPublishKey{}).(time.Duration); ok {
			delay = v
		}
	}

	b, err := n.opts.Codec.Marshal(message)
	if err != nil {
		return err
	}

	if doneChan != nil {
		if delay > 0 {
			return p.DeferredPublishAsync(topic, delay, b, doneChan)
		}
		return p.PublishAsync(topic, b, doneChan)
	} else {
		if delay > 0 {
			return p.DeferredPublish(topic, delay, b)
		}
		return p.Publish(topic, b)
	}
}

复制代码

从发布在列表中取出一个发布者,然后将编码后的消息发布到指定topic中。

最后就是订阅者方法


func (n *nsqBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
	options := broker.SubscribeOptions{
		AutoAck: true,
	}

	for _, o := range opts {
		o(&options)
	}

	concurrency, maxInFlight := DefaultConcurrentHandlers, DefaultConcurrentHandlers
	if options.Context != nil {
		if v, ok := options.Context.Value(concurrentHandlerKey{}).(int); ok {
			maxInFlight, concurrency = v, v
		}
		if v, ok := options.Context.Value(maxInFlightKey{}).(int); ok {
			maxInFlight = v
		}
	}
	channel := options.Queue
	if len(channel) == 0 {
		channel = uuid.New().String() + "#ephemeral"
	}
	config := *n.config
	config.MaxInFlight = maxInFlight

	c, err := nsq.NewConsumer(topic, channel, &config)
	if err != nil {
		return nil, err
	}

	h := nsq.HandlerFunc(func(nm *nsq.Message) error {
		if !options.AutoAck {
			nm.DisableAutoResponse()
		}

		var m broker.Message

		if err := n.opts.Codec.Unmarshal(nm.Body, &m); err != nil {
			return err
		}

		p := &publication{topic: topic, m: &m}
		p.err = handler(p)
		return p.err
	})

	c.AddConcurrentHandlers(h, concurrency)

	if len(n.lookupdAddrs) > 0 {
		err = c.ConnectToNSQLookupds(n.lookupdAddrs)
	} else {
		err = c.ConnectToNSQDs(n.addrs)
	}
	if err != nil {
		return nil, err
	}

	sub := &subscriber{
		c:     c,
		opts:  options,
		topic: topic,
		h:     h,
		n:     concurrency,
	}

	n.c = append(n.c, sub)

	return sub, nil
}

复制代码

在这个方法中,我们通过Nsq config来创建nsq消费者。然后定义一个nsq HandlerFunc来处理消费到的消息。然后就可以连接到nsq服务上进行监听消息了。最后创建的订阅者实例放到nsqBroker实例中的订阅者列表中。想要了解nsq Consumer 是如何连接Nsqd服务进程的,可以去go-nsq代码看具体的流程这个已经不属于我们今天的主题了,大家只要知道,Nsq建议我们使用lookupd来进行topic发布者的服务发现。

// ConnectToNSQDs takes multiple nsqd addresses to connect directly to.
//
// It is recommended to use ConnectToNSQLookupd so that topics are discovered
// automatically.  This method is useful when you want to connect to local instance.
func (r *Consumer) ConnectToNSQDs(addresses []string) error {
	for _, addr := range addresses {
		err := r.ConnectToNSQD(addr)
		if err != nil {
			return err
		}
	}
	return nil
}
复制代码

至此一个自定义的broker实现方案就出来了,你可以通过kafka来实现自己的broker

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享