1.创建消费者
通过c:=nsq.NewConsumer(…)方式创建消费者
2.消费者注册消息监听
(1)给消费者c增加异步处理器handler:
c.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message)))
(2)AddHandler方法中调用AddConcurrentHandlers()
(3)AddConcurrentHandlers()中开启一个goroutine,调用handlerLoop(handler),
handlerLoop中开启无限循环,通过无缓冲通道incomingMessages,阻塞监听最新消息,
获取到消息,传递给回调handler.HandleMessage(message)处理
func (r *Consumer) AddHandler(handler Handler) {
r.AddConcurrentHandlers(handler, 1)
}
func (r *Consumer) AddConcurrentHandlers(handler Handler, concurrency int) {
for i := 0; i < concurrency; i++ {
go r.handlerLoop(handler)
}
}
func (r *Consumer) handlerLoop(handler Handler) {
for {
message, ok := <-r.incomingMessages
if !ok {
goto exit
}
if r.shouldFailMessage(message, handler) {
message.Finish()
continue
}
err := handler.HandleMessage(message)
...
}
复制代码
3.连接NSQLookupd
调用c.ConnectToNSQLookupd(v),该方法中会调用queryLookupd()->ConnectToNSQD()
func (r *Consumer) ConnectToNSQLookupd(addr string) error {
...
if numLookupd == 1 {
r.queryLookupd()
r.wg.Add(1)
go r.lookupdLoop()
}
return nil
}
func (r *Consumer) queryLookupd() {
..
for _, addr := range nsqdAddrs {
err = r.ConnectToNSQD(addr)
...
}
}
复制代码
在ConnectToNSQD()中我们可以看到,会通过conn.Connect()建立跟nsqd的TCP连接
func (r *Consumer) ConnectToNSQD(addr string) error {
...
resp, err := conn.Connect()
...
}
复制代码
在Connect会开启一个goroutine,在readLoop方法中无限循环的监听消息的到来
func (c *Conn) Connect() (*IdentifyResponse, error) {
dialer := &net.Dialer{
LocalAddr: c.config.LocalAddr,
Timeout: c.config.DialTimeout,
}
conn, err := dialer.Dial("tcp", c.addr)
if err != nil {
return nil, err
}
c.conn = conn.(*net.TCPConn)
...
go c.readLoop()
go c.writeLoop()
return resp, nil
}
复制代码
当收到消息后,交给c.delegate.OnMessage()方法处理,在该方法中,会把消息发送给无缓冲消息通道incomingMessages,这样就完成了整个接收消息的逻辑.
func (c *Conn) readLoop() {
delegate := &connMessageDelegate{c}
for {
...
frameType, data, err := ReadUnpackedResponse(c)
switch frameType {
...
case FrameTypeMessage:
msg, err := DecodeMessage(data)
...
c.delegate.OnMessage(c, msg)
...
}
...
}
func (r *Consumer) onConnMessage(c *Conn, msg *Message) {
...
r.incomingMessages <- msg
...
}
复制代码
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END