RocketMQ学习-消息协议(一)

这是我参与8月更文挑战的第5天,活动详情查看:8月更文挑战

PS:消息协议参考《分布式消息中间件实战-倪炜》

上一章消息队列的简单设计与实现中,在BorkerServer特地定义了一个 “CONSUME” 字段,表示该消息是消费消息。只要没有 “CONSUME” 字段,就表示不是消费消息。这种简单的约定我们也可以称之为一种协议。当然我们这种 “协议” 是上不得台面的,因为没有一个统一的标准。那天另一个人来了,人家不想用 “CONSUME” 字段,换一个,客户端想消费消息就需要改代码了。这种设计显然 “不靠谱”。

学过《计算机网络》的筒子们都应该知道有一个七层、理想化的网络七层协议。大家网络通信都要遵从这一协议。在计算机领域中,只要涉及不同计算机要共同完成一件事情的时候,就肯定会有协议的存在,只有大家都遵循统一的协议,才能消除差别。

目前常见的开放协议有:MQTTAMQPSTOMPXMPP等。而有些特殊的框架(RedisKafkaZeroMQ)根据自身需要未严格遵循MQ规范,而是基于TCP/IP自行封装了一套协议,通过网络Socket接口进行传输,实现了MQ功能。

PS:本章就是简单概括一下这些开放协议,细致的可能要读者自己去 “细细品味” ,书名上面提到过。感觉了解了就行。

AMQP

AMQP将协议的内容分为三部分:

1.基本概念;

2.功能命令;

3.传输层协议;

基本概念

基本概念是指 AMQP内部定义的各组件及组件的功能说明。

主要概念

1.Message(消息):消息服务器所处理数据的原子单元。包括内容头、属性、内容体。服务器对于消息处理时要保证:消息存储、消息优先级(当消息必须丢弃来确保服务质量,那么服务会丢弃低优先级的消息)、消息可更改或者删除;

2.Publisher(消息生产者):向交换器发布消息的客户端应用程序;

3.Exchange(交换器):接收生产者发送的消息,并将这些消息路由给服务器中的队列;

4.Binding(绑定):用于消息队列和交换器之间的关联,即消息队列中的消息通过交换器内部规则转发到特定的消费者那里;

5.Virtual Host(虚拟主机):每个虚拟主机本质上都是一个mini版的消息服务器,拥有自己的队列、交换器、绑定和权限机制;

6.Broker(消息代理):表示消息队列服务实体(PS:个人理解这里觉得方法更贴切),接收客户端连接,实现 AMQP 消息队列和路由功能的过程;

7.Routing Key(路由规则):虚拟机可用它如何确定如何路由一个特定消息;

8.Queue(消息队列):用来保存消息直到发送给消费者;

9.Connection(连接):可以理解成客户端和消息队列服务之间的一个TCP连接;

10.Channel(信道):信道是一条独立的双向数据流通道,它是建立在真实的TCP连接内的虚拟连接,一个连接可以包含多个信道(PS:使用信道是因为TCP连接是非常昂贵的,而且每个服务的操作系统也无法承受每秒如此多的TCP连接);

11.Consumer(消费者):表示从消息队列中取得消息的客户端应用程序;

核心组件的生命周期

1.消息的生命周期:Publisher生产数据–》Broker中的Exchange(规则表:Routing Key 和Queue的映射关系——Binding)根据规则推送消息;如图:

image.png

生产者在发布消息时可以给消息指定各种消息属性(message meta-data),有些属性可能会被消息代理使用,有些则是完全不可见,只能被消费者所使用。至于消息的各种情况,已经介绍过,不在赘述;

2.交换器的生命周期:每台 AMQP 都会预先创建许多交换器实例,它们在服务启动时就存在,并且不能被销毁。如果有特殊要求,可以自己创建交换器,然后在完成工作后进行销毁;

3.队列的生命周期:主要分为两种消息队列的生命周期——持久化消息队列和临时消息队列;

持久化:持久化消息队列可被多个消费者共享,不管是否有消费者接收,它们都可以独立存在;

临时:临时消息队列对某个消费者是私有的,只能绑定到此消费者,当消费者断开连接时,该消息队列被删除;

功能命令

功能命令是指该协议所定义的一系列命令,应用程序可以基于这些命令来实现相应的功能。

AMQP 协议文本时分层描述的:0-9版本分两层:

Functional Layer(功能层):定义了一系列命令,这些命令按照不同的功能组合成不同的类,客户端可以利用它们来实现自己的业务功能;

Transport Layer(传输层):将功能层所接收的消息传递给服务器,经过相应处理后再返回,处理的事情包括:信道复用、帧同步、内容编码、心跳检测、数据表示和容错处理等。

如图:

image.png

而0-10版本则分为三层:

Model Layer(模型层):模型层定义了一套命令,客户端使用这些命令实现业务功能;

Session layer(会话层):负责将命令从客户端传递给服务器,再将服务的响应返回给客户端应用,会话层为这个传递过程提供了可靠性、同步机制和错误处理;

传输层:负责提供帧处理、信道复用、错误检测和数据表示;

如图:

image.png

小结:这种分层的意义在于不改变协议对外提供的功能的前提下,实现各层的可替换,却又不影响该层与其他层的交互。

传输层

传输层是一个网络级协议,它定义了数据的传输格式,消息队列的客户端可以基于这个协议与消息代理和 AMQP 的相关模型进行交互通信,该协议的内容包括数据帧处理、信道复用、内容编码、心跳检测、数据表示和错误处理等。

消息数据格式

所有数据必须有特定的格式来支持,这部分就是在传输层中定义的。AQMP 是二进制协议,不同版本,数据协议也有所不同。下面以0-9-1版本为例:

image.png

1.帧头(header):7个字节–》type帧类型、channel通道、size数据大小;

type帧类型分为以下几种:type=1–》“METHOD” 方法帧、type=2–》“HEADER” 内容头帧、type=3–》“BODY” 内容体帧、type=4–》“HEARTBEAT” 心跳帧。

2.数据内容(payload 负载):任意大小;

3.检测错误的结束帧(frame_end):

MQTT

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是 IBM 开发的一个即时通信协议。

MQTT发展史大致如下:

image.png

MQTT基本组件

在MQTT协议中,使用者有三种身份:发布者(Publisher)、代理(Broker)、订阅者(Subscriber)。

其中消息的发布者和订阅者都是客户端,消息代理是服务器,而消息发布者可以同时是订阅者。

其中消息的流转如下:

image.png

发布者发布消息到代理服务器,这个消息会包含Topic(主题),订阅者只需要订阅相关的主题,那么当发布者发布了包含Topic的消息,订阅者就会收到代理服务推送的消息。

1.网络连接(Network Connection):网络连接是指客户端连接服务器传通信时使用的底层传输协议,由该连接提供有序、可靠的、基于字节流的双向传输;

2.应用消息(Application Message):指通过网络传输的应用数据,该数据一般包括主题和负载两部分;

3.主题(Topic):相当于应用类型、消息订阅者订阅后,就会收到该主题的消息内容;

4.负载(Playload):指消息的具体内容;

5.客户端(Client):指连接MQTT的程序或者设备;

6.服务器(Server):服务器也是程序或者设备,作为发送消息的客户端和请求订阅客户端之间的中介;

7.会话(Session):客户端与服务器建立连接之后就是一个会话,通过这个session进行交互;

8.订阅(Subscription):订阅一般与一个会话关联。会话可以包含多余一个的订阅。订阅包含一个主题过滤器和一个服务质量(QoS)等级;

9.主题名(Topic Name):主题名称是附加在消息上的一个标签,服务器会根据标签将消息发送给与订阅所匹配的每个客户端;

10.主题过滤器(Topic Filter):主题过滤器是订阅中包含的一个表达式,匹配Topic Name;

11.MQTT控制报文(MQTT Control Packet): MQTT控制报文实际上就是通过网络连接发送的信息数据包;

MQTT消息数据格式

MQTT协议是通过交换预定义的MQTT控制报文来通信的:

固定报头(Fixed header)

固定报头存在所有控制报文,包含:报文类型、标识位和剩余长度;
如图:

image.png

固定报头高4位表示控制报文类型:如图所示类型(MQTT3.1.1):

image.png

低4位表示标识位:如图所示标识位意义(MQTT3.1.1):

image.png

可以看到除了PUBLISH类型会使用标识位,其他类型都是固定值。

1.DUP:重发标识,保证消息可靠传输如果设置为0,表示客户端或者服务端第一次请求发送这个PUBLISH报文,如果为1,则表示这可能是一个早前报文请求的重发;

2.QoS:服务质量等级标识。用于保证消息传递次数:

2.1. 00:表示最多一次,即<=1;01表示至少一次,即>=1;10表示一次,即=1,11保留后用;

3.RETAIN.保留标识:表示服务器要保留这次推送的消息。如果有新的订阅者出现,则把这条消息推送给他;没有就推送至当前订阅者后释放;

4.剩余长度:固定报头从第2个字节开始表示剩余长度,指当前报文剩余部分的字数,包括可变报头和消息体数据的总大小。

可变报头(Variable header)

可变报头存在于部分控制报文,由固定报头中的控制报文类型决定是否需要可变报头,以及内容;

可变报头的内容根据控制报文类型的不同而不同。可变报头的报文标识符(Packet Identifier)字段存在于多种类型的报文里。最常用的是作为报的标识符。

image.png

可变报头里包含2字节的报文标识符字段,这些类型包括:PUBLISH(QoS>0)、PUBACK、PUBREC、PUBREL、PUBCOMP、SUBSCRIBE、SUBACK、UNSUBSCRIBE、UNSUBACK;

其中SUBSCRIBE、UNSUBSCRIBE、PUBLISH的控制报文必须包含一个非零的16位报文标识符;

PUBACK、PUBREC、PUBREL报文必须包含与最初发送的 PUBLISH 报文相同的报文标识符;

SUBACK、UNSUBACK必须包含对应的 SUBSCRIBE、UNSUBSCRIBE报文中使用的报文标识符;

其中与QoS 1 的 PUBLISH 对应的是 PUBACK,与QoS 2 的 PUBLISH 对应的是 PUBCOMP,与 SUBSCRIBE 对应的是 SUBACK,与 UNSUBSCRIBE 对应的是 UNSUBACK;如图:

image.png

报文标识符在客户端和服务端的直接分配是相互独立的。所以客户端和服务端可以组合使用相同的报文标识符以实现并发的消息交换。

消息体(Payload)

存在于部分控制报文中,表示客户端接收的具体内容;

CONNECT、SUBSCRIBE、SUBACK和UNSUBSCRIBE需要有消息体,PUBLISH类型的消息体是可选的。

1.CONNECT,消息体主要内容是客户端的ClinetId、订阅的主题、Message、用户名、密码;

2.SUBSCRIBE,消息体内容至少是一对主题过滤器和QoS等级字段的组合;

3.SUBACK,消息体内容是对于SUBSCRIBE所申请的主题及QoS等级确认和回复;

4.UNSUBSCRIBE,消息体内容主要是订阅的主题;

MQTT消息通信

MQTT协议中的客户端与服务之间一般是通过请求应答模式来通信的。如下图:

image.png

通信过程分为三步:

1.建立连接:客户端与服务器建立连接后,发送的第一个报文必须是CONNECT,然后服务器发送CONNECTACK响应客户端;

2.发布消息:客户端向服务端传送消息使用PUBLISH报文,按照消息的QoS等级会有不同应答类型报文;

2.1 QoS 0:最多一次:如图:

image.png

0是最低等级,具有很高的传输性能,接收者不需要应答消息,发送者也不需要重发和保存消息;

2.2 QoS 1:至少一次:如图:

image.png

使用等级1,保证消息至少被送达接收者一次,但也可能因为网络原因,在规定时间内没有收到应答报文,消息会被重复发送,造成消息被多次送达;服务端在收到消息后会立即发送PUBACK给客户端,表示已收到消息;

2.3 QoS 2:只有一次:2是最高等级,保证每条消息只被接收一次。是最安全的,但是性能不足。其过程如图:

image.png

1.发送者发送一条QoS 2 的PUBLISH报文;

2.接收者收到报文处理消息并返回一条PUBREC报文进行应答,并存储包的标识符等状态数据作为参考;

3.当发送者收到PUBREC报文就可以丢弃之前发布的消息(PS:因为此时已经知道该消息已达接收者),并发保存PUBREC报文同时应答一条PUBREL报文;

4.接收者在收到PUBREL保文后,它会丢弃所有已经保存到状态,并应答一条PUBCOMP报文;

5.发送者收到PUBCOMP报文,清空之前所有的保存状态;

当所有流程走完,发送者就可以认为消息被送达。

3.主题订阅:客户端向服务端发送SUBSCRIBE报文用于订阅一个或多个主题。服务端发送SUBACK用于确认消息已经收到并正在处理SUBSCRIBE报文。SUBACK包含了返回码清单,指定了SUBSCRIBE请求的每个订阅被授予的最大QoS等级。

4.心跳检测:客户端发送PINGREQ报文给服务器,用于:

4.1.在没有报文从客户端发送给服务器时,告知服务器客户端还活着;

4.2.请求服务器响应发送确认信息确认客户端还活着;

4.3.通过网络确认网络连接没有断开,然后服务器发送PINGRESP报文响应客户端的PINGREQ报文,表示服务期还活着;

  1. 断开连接:DISCONNECT时客户端发送给服务器的最后一条控制报文,表示客户端正常断开连接;

如果还想要具体了解MQTT协议,请参见:MQTT协议文档中文

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享