- [go-nsq] NSQ 是怎么投送消息的
- [go-nsq] NSQ 是怎么消费消息的
- [nsqd] 消息的内部处理流程
- [nsqd] 状态同步 & RDY 命令
基于
nsqd v1.2.0
go-nsq v1.0.8
这是一个系列文章(后续会慢慢补全
概览(图
核心循环
了解了这个处理循环,就了解了大半的消息发布流程,所以这里主要刨析这个循环里面的各种机制 (位于go-nsq producer.go
func (w *Producer) router() {
for {
select {
case t := <-w.transactionChan:
w.transactions = append(w.transactions, t)
err := w.conn.WriteCommand(t.cmd)
if err != nil {
w.close()
}
case data := <-w.responseChan:
w.popTransaction(FrameTypeResponse, data)
case data := <-w.errorChan:
w.popTransaction(FrameTypeError, data)
case <-w.closeChan:
goto exit
case <-w.exitChan:
goto exit
}
}
exit:
w.transactionCleanup()
w.wg.Done()
}
复制代码
-
router
- 当构建完毕后,producer 会创建一个专门用于监听和处理 各个 channel 的 goroutine (消息发布的主循环
-
transactionChan (消息管道,用于汇集用户发布的消息
- 所有用户的发布消息都会写入到这个 channel (可能是 N 个goroutine,最终在这一个 go router 中,排列处理。
- 将消息写入到 TCP 流中
- 这里另外还维护了一个 transactions 结构,因为 TCP 是顺序的,因此在这里用 FIFO 队列维护过程中的消息体
-
responseChan & errorChan (消息管道从 TCP 接收端 收取到正常 & 错误的回复
- 这两个 channel 接收到消息后,都会对回复进行处理,从 transactions 队列中取出头消息,通过消息发送出
done
信号 (主要是返回成功错误
- 这两个 channel 接收到消息后,都会对回复进行处理,从 transactions 队列中取出头消息,通过消息发送出
-
closeChan & exitChan 用于接收关闭&退出
信号
-
exit (退出点
- 首先退出逻辑处理循环;
- 当接收到 关闭|退出 信号时,在这里处理清理流程。
- 首先处理
发送中
的消息 transactions(发布 done 信号, 主要是提示错误 失去连接 - 等待其他消息发布过程全部完结 concurrentProducers
- 处理还在
内部处理过程中
消息 transactionChan (还是发布错误信号 失去连接 - 当全部处理完成后, wg.Done() 退出阻塞,完成退出逻辑。
我们可以看到在这个循环体中 go-nsq 是如何使用
select
&channel
设计出简洁且高效的多线程处理模型
- 基于 select 中的 transactionChan,closeChan,exitChan 管道,进行消息的发布和接收
- 利用类似 FIFO 的队列,管理发布出去的消息
- 利用 closeChan,exitChan 接收外部的中断指令
- concurrentProducers 用于统计同时处理的异步指令
- wg 用于阻塞等待清理完毕
Producer 总结
- 发布是采用的 TCP 协议(通过command进行消息的组装
- 发布端需要在构建的时候指定对应的 nsqd,Producer 和 Nsqd 是 1:1 的关系
- 发布端可以同时向多个Nsqd,写入消息(多个Producer同时写,目的是实现HA
- 发布空的消息(message ,会导致panic
- 关键文件介绍
- producer.go 实现了消息的发布逻辑
- conn.go 实现了网络相关的逻辑
- command.go 实现了tcp的封包逻辑
最后推荐个自己的微服务框架
barid-go 一个基于模块组合,简单易用的微服务框架 ;)
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END