在第三篇文章中,我们将在前一篇的基础上,加入消息确认机制。
问题提出
目前,当一条 message 发送给消费者之后,mini-nsq 就不再负责了,但是,消费者也有可能会消费失败,此时就需要 mini-nsq 有重新投递此条 message 的能力。
代码设计
在 nsq 的设计中,当一条 message 发送给消费者之后,并不会马上删除,而是暂时转移到一个专门的容器中。如果消费者消费成功的话,就会给 nsq 发送一条 “FIN” 消息,消息中会包含此条 message 的 id,那么 nsq 就会从容器中找到这条消息并删除;如果消费者消费失败 的话,就会给 nsq 发送一条 “REQ” 消息,消息中同样包含此条 message 的 id,那么 nsq 就会重新发送这条 message。同时有专门的协程每隔一段时间会扫描一次容器,如果发现存在超过了指定时间而仍未被处理的 message 的话,也会重新发送此条 message。
代码变化分析
通过上面的分析,我们知道对于这个存储已发送但还未被确认的 message 的“容器”,我们有两种查找需求:1.通过 id 找到 message 2.找到所有 message 中最先过期的(指的就是按时间排序,最早需要被重发的)。如果只通过单一的数据结构的话,那么必然有一种查找需要遍历所有才能找到我们需要的那条 message。为了降低查找的时间复杂度,我们必须同时维护两个数据结构。
对于第一种查找需求,肯定是 Map 最合适。对于第二种查找需求的话,如果所有 message 的超时时间都是一致的话(在我们的实现中是这样的),那么我们只需要维护个切片就可以,先放进去的肯定最先过期。但是在实际的 nsq 中,每个客户端是可以单独设置发送给该客户端的所有 message 的超时时间,这就意味着先放进去的不一定比后放进去的先过期。为了在此种条件下,仍然能通过 O(1) 的时间复杂度找到最先过期的 message ,nsq 使用了一种叫做优先级队列的数据结构。该数据结构插入和删除的时间复杂度是 O(logN),查找的时间复杂度是 O(1) ,如果不熟悉的话读者可自行查找相关资料。
为了和 nsq 保持一致性,同时因为在 go 中实现优先级队列是很简单的,因为 go 工具包本身就提供了 heap 这个结构帮助我们实现,此处我们就选择弃用切片而使用优先级队列。再次强调,此处优先级队列并不是必须的,因为在我们目前的设计中所有的 message 的超时时间都是一致的,如果读者感兴趣的话,此处可自行尝试用切片进行替换。
message.go
type Message struct {
// for in-flight handling
index int // 表示在inFlightPQ中的下标位置
pri int64 //过时时间
}
复制代码
在 Message 中,我们增加了两个字段,pri 字段就是优先级队列中用于排序的依据,记录index 则是为了方便我们从优先级队列中删除指定 message。
pqueue.go
该文件实现了优先级队列,我们具体看看。
type inFlightPqueue []*Message
func newInFlightPqueue(capacity int) *inFlightPqueue {
pq:=make(inFlightPqueue, 0, capacity)
return &pq
}
//以下 5个方法是 heap 规定的要实现优先级队列必须实现的 5个方法
func (pq inFlightPqueue) Len() int { return len(pq) }
func (pq inFlightPqueue) Less(i, j int) bool {
return pq[i].pri < pq[j].pri
}
func (pq inFlightPqueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
func (pq *inFlightPqueue) Push(x interface{}) {
msg:=x.(*Message)
n := len(*pq)
msg.index = n
*pq = append(*pq, msg)
}
func (pq *inFlightPqueue) Pop() interface{} {
n := len(*pq)
msg := (*pq)[n-1]
*pq = (*pq)[0 : n-1]
msg.index=-1
return msg
}
//以下 “W” 开头的方法是我们提供给外部使用的
//向队列中加入1条message
func (pq *inFlightPqueue) WPush(msg *Message) {
heap.Push(pq, msg)
}
//从队列中弹出最早过期的那条 message
func (pq *inFlightPqueue) WPop() *Message{
return heap.Pop(pq).(*Message)
}
//从队列中删除指定的 message
func (pq *inFlightPqueue) WRemove(msg *Message) {
heap.Remove(pq,msg.index)
}
//查找队列中最早过期的那条 message,但并不弹出
func (pq *inFlightPqueue) WPeek(max int64) *Message {
if len(*pq) == 0 {
return nil
}
msg:=(*pq)[0]
if msg.pri > max {
return nil
}
return msg
}
复制代码
inFlightPqueue 中存放的都是 message 指针,代码中标注的5个方法是 heap 规定的要实现优先级队列必须实现的 5个方法,具体大家可参考相关资料,当然此处不了解也不影响后续代码的理解。注意在 Less 方法中,我们是根据 message 的字段 pri 来进行比较的。在 Swap 方法中,保证了 message 的 index 字段和 message 在队列中的索引是一致的。后面 “W” 开头的方法就是我们提供给外部使用的,方法作用见注释。注意在 WRemove 方法中,heap.Remove 的第二个参数表示message 所在的索引,而 message 的 index 存储的就是这个值。
pqueue_test.go
为了更好的理解这个优先级队列,我们写了一个测试程序。
func TestInFlightPqueue(t *testing.T) {
pq := newInFlightPqueue(5)
//var id MessageID
msg1 := &Message{
pri: 1,
Body: []byte("first"),
}
pq.WPush(msg1)
msg2 := &Message{
pri: 3,
Body: []byte("three"),
}
pq.WPush(msg2)
msg3 := &Message{
pri: 2,
Body: []byte("two"),
}
pq.WPush(msg3)
pq.WRemove(msg1)
for len(*pq) > 0 {
fmt.Printf("%s\n", pq.WPop().Body)
}
}
复制代码
如上,我们先向队列中 push 了3条 message,它们的 pri 分别是1,3,2,然后删除第一条 message,再依次 pop 出余下的 message。最终输出如下。读者可自行调整进行测试。
two
three
复制代码
topic.go
此前,message ID 因为并没有被用到过,所以所有都使用了初始值,所有的 id 都是一样的。但从这章开始,message ID 就需要用来标识 message了。从前边我们知道一个客户端只能订阅一个 channel,并且在后面我们会看到每个 channel 都会维护一个存储已发送但还未被确认的 message 的“容器”。所以理论上这个 message ID 只要在 channel 层面上不重复即可。而 channel 中的 message 是由 topic 传递过去的,并且我们从生产者中接收到消息而新建 message 时也只知道该 message 所属的 topic 是什么。为了方便代码的编写,我们保证 ID 在topic层面是不重复的,这样同一个 channel 中的 ID 肯定也是不重复的,至于不同 topic 重复了也没有关系。
生成 ID 的结构体是 guidFactory,每个 topic 都会单独维护一个。这个结构体参考了雪花算法以及一些其他一些生成分布式id的算法,它在生成 id 时会将主机名考虑在内,这样不同机器的同一个 topic 生成的 ID 也不可能重复。代码细节就不分析了,读者可以把它当成一个黑盒子使用。
接下啦我们看看 Topic 和 ID 生成的相关代码
type Topic struct {
idFactory *guidFactory
}
func NewTopic(topicName string) *Topic {
t := &Topic{
idFactory: NewGUIDFactory(),
}
}
func (t *Topic) GenerateID() MessageID {
var i int64 = 0
for {
id, err := t.idFactory.NewGUID()
if err == nil {
return id.Hex()
}
if i%10000 == 0 {
log.Printf("TOPIC(%s): failed to create guid - %s", t.name, err)
}
time.Sleep(time.Millisecond)
i++
}
}
复制代码
可以看到 topic 会维护一个 guidFactory,并使用它来生成 ID,GenerateID方法比较复杂,当成黑盒子使用即可。
channel.go
type Channel struct {
inFlightMessages map[MessageID]*Message //存放已发送但客户端还未接收的数据
inFlightPQ *inFlightPqueue //存放已发送但客户端还未接收的数据
inFlightMutex sync.Mutex
}
func NewChannel(topicName string, channelName string) *Channel {
c.inFlightMutex.Lock()
c.inFlightMessages = make(map[MessageID]*Message)
c.inFlightPQ = newInFlightPqueue(10000)
c.inFlightMutex.Unlock()
}
复制代码
在每个channel中我们都会维护一个map和一个优先级队列作为“容器”用来存放已发送但客户端还未接收的数据,因为会涉及到并发操作,所以我们使用了锁来保护
//每次在向客户端实际发送消息前都会调用
func (c *Channel) StartInFlightTimeout(msg *Message) error {
now := time.Now()
msg.pri = now.Add(10 * time.Second).UnixNano()
return c.pushInFlightMessage(msg)
}
// 如果接收到了客户端的 “FIN” 消息,就会被调用
func (c *Channel) FinishMessage(id MessageID) error {
_, err := c.popInFlightMessage(id)
return err
}
// 如果接收到了客户端的 “REQ” 消息,就会被调用
func (c *Channel) RequeueMessage(id MessageID) error {
// remove from inflight first
msg, err := c.popInFlightMessage(id)
if err != nil {
return err
}
return c.PutMessage(msg)
}
//每隔固定时间调用一次
func (c *Channel) processInFlightQueue(t int64) bool {
dirty := false
for {
c.inFlightMutex.Lock()
msg:= c.inFlightPQ.WPeek(t)
c.inFlightMutex.Unlock()
if msg == nil {
goto exit
}
log.Printf("message:%s is dirty",msg.Body)
dirty = true
_, err := c.popInFlightMessage(msg.ID)
if err != nil {
goto exit
}
c.PutMessage(msg)
}
exit:
return dirty
}
复制代码
上述方法都是提供给外部调用的。StartInFlightTimeout 是在向客户端实际发送 message 前调用,它会把该条 message 的超时时间设置为10s之后,然后把它放到“容器”里。FinishMessage 是在收到了客户端的 “FIN” 消息后被调用,它会删除容器中的该条 message。RequeueMessage 是在收到了客户端的 “REQ” 消息后被调用,它首先会删除容器中的该条 message,然后重新将该条 message 投递到 channel 中。processInFlightQueue 则是每隔固定时间调用一次,他首先会检查过期时间最早的那条 message 是否已经过期,如果没有就退出,如果有的话就执行和 “REQ” 一致的操作,注意它会返回一个 bool 变量表明此次调用是否找到了过期 message
接下来我们具体看看插入、删除方法
func (c *Channel) pushInFlightMessage(msg *Message) error {
c.inFlightMutex.Lock()
defer c.inFlightMutex.Unlock()
//1.加入map
_, ok := c.inFlightMessages[msg.ID]
if ok {
return errors.New("ID already in flight")
}
c.inFlightMessages[msg.ID] = msg
//2.加入inFlightPQ
c.inFlightPQ.WPush(msg)
return nil
}
func (c *Channel) popInFlightMessage(id MessageID) (*Message, error) {
c.inFlightMutex.Lock()
defer c.inFlightMutex.Unlock()
//1.从map中删除
msg, ok := c.inFlightMessages[id]
if !ok {
return nil, errors.New("ID not in flight")
}
delete(c.inFlightMessages, id)
//2.从inFlightPQ中删除
c.inFlightPQ.WRemove(msg)
return msg, nil
}
复制代码
如上,都是先检查map 查看此次操作是否合法,然后再对 map 和优先级队列依次执行操作。
protocal.go
func (p *protocol) messagePump(client *client) {
for {
select {
case subChannel = <-subEventChan: //表示有订阅事件发生,这里的subChannel就是消费者实际绑定的channel
...
case msg = <-memoryMsgChan: //如果channel对应的内存通道有消息的话
case buf := <-backendMsgChan:
...
}
subChannel.StartInFlightTimeout(msg)
err = p.SendMessage(client, msg)
if err != nil {
log.Printf("PROTOCOL(V2): [%s] messagePump error - %s", client.RemoteAddr(), err)
goto exit
}
}
}
复制代码
注意在 message 发送之前会先调用 StartInFlightTimeout 将 message 暂存到“容器”中。后面如果发送失败的话,我们即使不执行任何操作 message 也不会丢失。
func (p *protocol) Exec(client *client, params [][]byte) error {
switch {
case bytes.Equal(params[0], []byte("FIN")): //Finish a message (indicate successful processing)
return p.FIN(client, params)
case bytes.Equal(params[0], []byte("REQ")): //Re-queue a message (indicate failure to process)
return p.REQ(client, params)
}
return errors.New(fmt.Sprintf("invalid command %s", params[0]))
}
func (p *protocol) FIN(client *client, params [][]byte) error {
msgID:=decodeMessageID(params[1])
log.Printf("ready to finish message -- msgID: %v", msgID)
err := client.Channel.FinishMessage(msgID)
if err != nil {
return err
}
return nil
}
//REQ <message_id>\n
func (p *protocol) REQ(client *client, params [][]byte) error{
msgID:=decodeMessageID(params[1])
log.Printf("ready to requeue message -- msgID: %v", msgID)
err := client.Channel.RequeueMessage(msgID)
if err != nil {
return err
}
return nil
}
复制代码
在 Exec 中我们加入了对 “FIN” 和 “REQ” 两种类型消息的支持,他们最终调用的就是我们上面在channel中分析的方法。
nsqd.go
如果每个 channel 都单独启动一个协程每隔固定时间检查一次它维护的“容器”里有没有过期的 message,那么同一时间就可能有很多协程在同时运行,事实上这是没有必要的,因为大部分 message 都会成功被 “FIN” 掉。为了提高效率,我们只在程序一启动时单独开启一个协程来检查所有的channel。
func Start() (*NSQD, error) {
go n.queueScanLoop()
}
func (n *NSQD) queueScanLoop() {
workTicker := time.NewTicker(100 * time.Millisecond)
defer workTicker.Stop()
var channels []*Channel
for {
select {
case <-workTicker.C:
channels = n.channels()
if len(channels) == 0 {
continue
}
}
num := 20
if num > len(channels) {
num = len(channels)
}
loop:
numDirty := 0
for _, i := range UniqRands(num, len(channels)) {
c:=channels[i]
now := time.Now().UnixNano()
if c.processInFlightQueue(now) {
numDirty++
}
}
// If 25% of the selected channels were dirty,
// the loop continues without sleep.
if float64(numDirty)/float64(num) > 0.25 {
goto loop
}
}
}
复制代码
在for 循环中,我们每隔100ms调用一次,但我们每次也不是检查所有的channel,而只是挑选其中的一部分进行检查,如果其中有过期 message 的channel比较多的话,就不再 sleep 而是紧接着再马上检查一次, 这里参考了redis中相关的算法,目的也是为了提高效率。
这里使用到了UniqRands方法,如下,它会从(0,maxval)中随机取出 quantity 个数组成一个切片返回。
// examplae: input (3,10) output: [0 8 7]
func UniqRands(quantity int, maxval int) []int {
if maxval < quantity {
quantity = maxval
}
intSlice := make([]int, maxval)
for i := 0; i < maxval; i++ {
intSlice[i] = i
}
for i := 0; i < quantity; i++ {
j := rand.Int()%maxval + i
// swap
intSlice[i], intSlice[j] = intSlice[j], intSlice[i]
maxval--
}
return intSlice[0:quantity]
}
复制代码
测试
client.go
func main() {
log.SetFlags(log.Lshortfile | log.Ltime)
nsqdAddr := "127.0.0.1:4150"
conn, err := net.Dial("tcp", nsqdAddr)
go readFully(conn)
if err != nil {
log.Fatal(err)
}
pub(conn, "mytopic", []byte("one one "))
pub(conn, "mytopic", []byte("two two"))
pub(conn, "mytopic", []byte("three three"))
cmd := Subscribe("mytopic", "mychannel")
cmd.WriteTo(conn)
select {}
}
func readFully(conn net.Conn) {
len := make([]byte, 4)
retry := 0
for {
_, err := conn.Read(len)
if err != nil {
fmt.Printf("error during read: %s", err)
}
size := binary.BigEndian.Uint32(len)
data := make([]byte, size)
var n int
n, err = conn.Read(data)
if err != nil {
fmt.Printf("error during read: %s", err)
}
msg, _ := nsqd.DecodeMessage(data)
log.Printf("local:%s, receive: id:<%v> body:<%s>,size:%d\n", conn.LocalAddr(), msg.ID, msg.Body, n)
if reflect.DeepEqual(msg.Body, []byte("two two")) && retry < 3 {
retry++
requeue(conn, msg.ID)
log.Printf("requeue message success -- msgID: %s", msg.Body)
time.Sleep(time.Second)
}
if reflect.DeepEqual(msg.Body, []byte("three three")) {
finish(conn, msg.ID)
log.Printf("finish message success -- msgID: %s", msg.Body)
}
}
}
复制代码
测试程序中,我们先发送了3条 message,然后订阅,在接收消息的协程中,我们会 “FIN” 内容为”three three”的这条消息,并且“Req”3次内容为”two two”的消息,而对内容为”one one “的这条消息不采取任何措施。
测试结果如下,通过客户端的输出我们可以看到”two two”这条消息在“Req”后很快被再次接收了,在往后,”one one “和”two two”这两条消息因为一直没有被“FIN”,所以每隔10s都会被再次接收一次。
代码地址
git clone https://github.com/xianxueniao150/mini-nsq.git
git checkout day03
复制代码