[go-nsq] NSQ 是怎么投送消息的

基于 nsqd v1.2.0 go-nsq v1.0.8 这是一个系列文章(后续会慢慢补全

概览(图

nsq-producer.png

核心循环

了解了这个处理循环,就了解了大半的消息发布流程,所以这里主要刨析这个循环里面的各种机制 (位于go-nsq producer.go

func (w *Producer) router() {
	for {
		select {
		case t := <-w.transactionChan:
			w.transactions = append(w.transactions, t)
			err := w.conn.WriteCommand(t.cmd)
			if err != nil {
				w.close()
			}
		case data := <-w.responseChan:
			w.popTransaction(FrameTypeResponse, data)
		case data := <-w.errorChan:
			w.popTransaction(FrameTypeError, data)
		case <-w.closeChan:
			goto exit
		case <-w.exitChan:
			goto exit
		}
	}

exit:
	w.transactionCleanup()
	w.wg.Done()
}
复制代码
  • router

    • 当构建完毕后,producer 会创建一个专门用于监听和处理 各个 channel 的 goroutine (消息发布的主循环
  • transactionChan (消息管道,用于汇集用户发布的消息

    • 所有用户的发布消息都会写入到这个 channel (可能是 N 个goroutine,最终在这一个 go router 中,排列处理。
    • 将消息写入到 TCP 流中
    • 这里另外还维护了一个 transactions 结构,因为 TCP 是顺序的,因此在这里用 FIFO 队列维护过程中的消息体
  • responseChan & errorChan (消息管道从 TCP 接收端 收取到正常 & 错误的回复

    • 这两个 channel 接收到消息后,都会对回复进行处理,从 transactions 队列中取出头消息,通过消息发送出 done 信号 (主要是返回成功错误
  • closeChan & exitChan 用于接收关闭&退出信号

  • exit (退出点

    • 首先退出逻辑处理循环;
    • 当接收到 关闭|退出 信号时,在这里处理清理流程。
    • 首先处理 发送中 的消息 transactions(发布 done 信号, 主要是提示错误 失去连接
    • 等待其他消息发布过程全部完结 concurrentProducers
    • 处理还在 内部处理过程中 消息 transactionChan (还是发布错误信号 失去连接
    • 当全部处理完成后, wg.Done() 退出阻塞,完成退出逻辑。

我们可以看到在这个循环体中 go-nsq 是如何使用 select & channel 设计出简洁且高效的多线程处理模型

  • 基于 select 中的 transactionChan,closeChan,exitChan 管道,进行消息的发布和接收
  • 利用类似 FIFO 的队列,管理发布出去的消息
  • 利用 closeChan,exitChan 接收外部的中断指令
  • concurrentProducers 用于统计同时处理的异步指令
  • wg 用于阻塞等待清理完毕

Producer 总结

  • 发布是采用的 TCP 协议(通过command进行消息的组装
  • 发布端需要在构建的时候指定对应的 nsqd,Producer 和 Nsqd 是 1:1 的关系
  • 发布端可以同时向多个Nsqd,写入消息(多个Producer同时写,目的是实现HA
  • 发布空的消息(message ,会导致panic
  • 关键文件介绍
    • producer.go 实现了消息的发布逻辑
    • conn.go 实现了网络相关的逻辑
    • command.go 实现了tcp的封包逻辑

最后推荐个自己的微服务框架

barid-go 一个基于模块组合,简单易用的微服务框架 ;)

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享