最近在看公司的 redis queue 时,发现底层使用的是 go-zero 的 queue 。本篇文章来看看 queue 的设计,也希望可以从里面了解到 mq 的最小型设计实践。
使用
结合其他 mq 的使用经历,基本的使用流程:
- 创建
producer或consumer - 启动
mq - 生产消息/消费消息
对应到 queue 中,大致也是这个:
创建 queue
// 生产者创建工厂
producer := newMockedProducer()
// 消费者创建工厂
consumer := newMockedConsumer()
// 将生产者以及消费者的创建工厂函数传递给 NewQueue()
q := queue.NewQueue(func() (Producer, error) {
return producer, nil
}, func() (Consumer, error) {
return consumer, nil
})
复制代码
我们看看 NewQueue 需要什么构建条件:
producer constructorconsumer constructor
将双方的工厂函数传递给 queue ,由它去执行以及重试。
这两个需要的目的是将生产者/消费者的构建和消息生产/消费都封装在 mq 中,而且将生产者/消费者的整套逻辑交给开发者处理:
type (
// 开发者需要实现此接口
Producer interface {
AddListener(listener ProduceListener)
Produce() (string, bool)
}
...
// ProducerFactory定义了生成Producer的方法
ProducerFactory func() (Producer, error)
)
复制代码
- 其实也就是将生产者的逻辑交个开发者自己完成,
mq只负责生产者/消费者的消息传递和之间的调度。 - 工厂方法的设计,是将生产者本身和生产消息,这两个任务都交给
queue自己来做调度或者重试。
生产msg
生产消息当然要回到生产者本身:
type mockedProducer struct {
total int32
count int32
// 使用waitgroup来模拟任务的完成
wait sync.WaitGroup
}
// 实现 Producer interface 的方法:Produce()
func (p *mockedProducer) Produce() (string, bool) {
if atomic.AddInt32(&p.count, 1) <= p.total {
p.wait.Done()
return "item", true
}
time.Sleep(time.Second)
return "", false
}
复制代码
queue 中的生产者编写都必须实现:
Produce():由开发者编写生产消息的逻辑AddListener():生产者
消费msg
和生产者类似:
type mockedConsumer struct {
count int32
}
func (c *mockedConsumer) Consume(string) error {
atomic.AddInt32(&c.count, 1)
return nil
}
复制代码
启动 queue
启动,然后验证我们上述的生产者和消费者之间的数据是否传输成功:
func TestQueue(t *testing.T) {
producer := newMockedProducer(rounds)
consumer := newMockedConsumer()
// 创建 queue
q := NewQueue(func() (Producer, error) {
return producer, nil
}, func() (Consumer, error) {
return consumer, nil
})
// 当生产者生产完毕,执行 Stop() 关闭生产端生产
go func() {
producer.wait.Wait()
// mq生产端停止生产,不是mq本身 Stop 运行
q.Stop()
}()
// 启动
q.Start()
// 验证生产消费端是否消息消费完成
assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count))
}
复制代码
以上就是 queue 最简易的入门使用代码。开发者可以根据自己的业务实际情况:自由定义生产者/消费者已经生产/消费逻辑。
整体设计

整体流程如上图:
- 全体的通信都由
channel进行 - 通过加入监听器
listener,以及事件触发event,相当于将触发器逻辑分离出来 - 生产者有
produceone,这个是生产消息的逻辑,但是其中的Produce()是由开发者编写【上面的interface中正是这个函数】 - 同理消费者,
Consume()
基本的消息流动就入上图以及上述描写的,具体的代码分析我们就留到下一篇,我们?分析里面,尤其是如何控制 channel 是整个设计的核心。
总结
本篇文章从使用以及整个架构分析上简略介绍了 queue 的设计。下篇我们将深入源码,分析内部消息流转以及 channel 控制。
关于 go-zero 更多的设计和实现文章,可以持续关注我们。欢迎大家去关注和使用。
项目地址
欢迎使用 go-zero 并 star 支持我们!
微信交流群
关注『微服务实践』公众号并回复 进群 获取社区群二维码。
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END






















![[桜井宁宁]COS和泉纱雾超可爱写真福利集-一一网](https://www.proyy.com/skycj/data/images/2020-12-13/4d3cf227a85d7e79f5d6b4efb6bde3e8.jpg)

![[桜井宁宁] 爆乳奶牛少女cos写真-一一网](https://www.proyy.com/skycj/data/images/2020-12-13/d40483e126fcf567894e89c65eaca655.jpg)