这是我参与更文挑战的第 20 天,活动详情查看: 更文挑战
日积月累,水滴石穿 ?
前言
上篇讲了讲 RabbitMQ
的概念和 RabbitMQ
的基本使用。那本篇来介绍介绍 RabbitMQ
的一些常用方法。
交换机
exchangeDeclare
声明交换机,exchangeDeclare
有多个重载方法,但是这些重载方法最终实现会由下面这个方法执行。位于 com.rabbitmq.client.Channel
public Exchange.DeclareOk exchangeDeclare(String exchange, String type,
boolean durable, boolean autoDelete, boolean internal,
Map<String, Object> arguments);
复制代码
这个方法的返回值是 Exchange.DeclareOk
用来标识成功声明了一个交换器。
各个参数详细说明如下所述:
-
exchange:交换器的名称。
-
type:交换器的类型。常见的如 fanout、direct、topic
-
durable: 设置是否持久化。 durable 设置为 true 表示持久化, 反之是非持久化。持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。默认为 false。
-
autoDelete: 设置是否自动删除。 autoDelete 设置为 true 表示自动删除。自动删除的前提是至少有一个队列或者交换器与这个交换器绑定之后。所有与这个交换器绑定的队列或者交换器都与之解绑才会自动删除。默认为 false。
-
internal: 设置是否是内置的。如果设置为 true ,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。默认为 false。
-
arguments: 其他一些结构化参数,如 x-message-ttl、 x-dead-letter-exchange、x-dead-letter-routing-key等
实例:
public static void main(String[] args) throws IOException {
Connection conn = RabbitMQUtil.createConn();
Channel channel = conn.createChannel();
String exchangName = "test1-gongjie";
HashMap<String, Object> map = new HashMap<>();
// 发送到队列的消息在丢弃之前可以存活多长时间(毫秒)
map.put("x-message-ttl",10000);
channel.exchangeDeclare(exchangName,
BuiltinExchangeType.TOPIC,true,false,map);
RabbitMQUtil.closeConn(channel,conn);
}
复制代码
exchangeDeclarePassive
用来检测相应的交换器是否存在。如果存在则正常返回;如果不存在则抛出异常 404 channel exception ,同时 Channel 也会被关闭。
Exchange.DeclareOk exchangeDeclarePassive(String name);
复制代码
- name:交换机名称
实例:
public static void main(String[] args) throws IOException {
Connection conn = RabbitMQUtil.createConn();
Channel channel = conn.createChannel();
String exchangName = "test1-gongjie1";
AMQP.Exchange.DeclareOk declareOk = channel.exchangeDeclarePassive(exchangName);
}
复制代码
exchangeDelete
删除交换机
Exchange.DeleteOk exchangeDelete(String exchange);
Exchange.DeleteOk exchangeDelete(String exchange ,
boolean ifUnused);
复制代码
其中 exchange
表示交换器的名称,ifUnused
用来设置是否在交换器没有被使用的情况下删除。如果 ifUnused
设置为 true
,则只有在此交换器没有被使用的情况下才会被删除。如果设置 false,也就是说这个交换器不论在什么情况下都要被删除。即使交换机不存在也不会报错。
实例:
public static void main(String[] args) throws IOException {
Connection conn = RabbitMQUtil.createConn();
Channel channel = conn.createChannel();
String exchangName = "test1-gongjie";
AMQP.Exchange.DeleteOk deleteOk = channel.exchangeDelete(exchangName, false);
System.out.println(deleteOk.protocolClassId() + "====" + deleteOk.protocolMethodName());
}
结果:40====exchange.delete-ok
复制代码
队列
queueDeclare
声明队列
public Queue.DeclareOk queueDeclare();
public Queue.DeclareOk queueDeclare(String queue,
boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments);
复制代码
不带任何参数的 queueDeclare 方法默认创建一个由 RabbitMQ 命名的(类似这种amq.gen-S9h… 的名称,这种队列也称之为匿名队列)、排他的、自动删除的、非持久化的队列。
参数说明:
-
queue:队列的名称。
-
durable:设置是否持久化。为 true 则设置队列为持久化。持久化的队列会存盘,在服务器重启的时候可以保证不丢失相关信息。
-
exclusive:设置是否排他。为 true 则设置队列为排他的。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:排他队列是基于连接( Connection )可见的,同一个连接的不同信道(Channel)是可以同时访问同一连接创建的排他队列。“首次”是指如果一个连接己经声明了排他队列,其他连接是不允许建立同名的排他队列的,这一个与普通队列不同,即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除。
-
autoDelete:设置是否自动删除。为 true 则设置队列为自动删除。自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。
-
arguments:设置队列的其他一些参数,如 x-message-ttl、 x-dead-letter-exchange、x-dead-letter-routing-key等。
实例:
public static void main(String[] args) throws IOException {
Connection conn = RabbitMQUtil.createConn();
Channel channel = conn.createChannel();
AMQP.Queue.DeclareOk declareOk = channel.queueDeclare();
System.out.println(declareOk.getQueue() + "====" + declareOk.protocolMethodName());
}
结果:amq.gen-lCUnxtHqTII99KMlAdk-yw====queue.declare-ok
复制代码
queueDeclarePassive
用来检测相应的队列是否存在。 如果存在正常返回 ,如果不存在则抛出异常: 404 channel exception ,同时Channel 也会被关闭。
public Queue.DeclareOk queueDeclarePassive(String queue);
复制代码
queueDelete
Queue.DeleteOk queueDelete(String queue);
Queue.DeleteOk queueDelete(String queue , boolean ifUnused,
boolean ifEmpty) ;
复制代码
其中 queue
表示队列的名称 ,如果 ifUnused
设置为 true ,则只有在此队列在没有被使用的情况下才会被删除, ifEmpty
设置为 true,表示在队列为空(队列里面没有任何消息堆积)的情况下才能够删除。
queuePurge
清除给定队列的消息。
Queue.PurgeOk queuePurge(String queue);
复制代码
绑定与解绑
queueBind
将队列和交换器绑定
public Queue.BindOk queueBind(String queue, String exchange,
String routingKey);
public Queue.BindOk queueBind(String queue, String exchange,
String routingKey, Map<String, Object> arguments);
复制代码
参数详解。
- queue:队列名称。
- exchange:交换器的名称。
- routingKey: 用来绑定队列和交换器的路由键。
- arguments:定义绑定的一些参数。
实例:
public static void main(String[] args) throws IOException {
Connection conn = RabbitMQUtil.createConn();
Channel channel = conn.createChannel();
HashMap<String, Object> map = new HashMap<>();
map.put("x-queue-mode",true);
AMQP.Queue.BindOk bindOk = channel.queueBind("queue-gongjie-1", "test1-gongjie", "info");
}
复制代码
queueUnbind
将队列和交换器解除绑定。
Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey) throws IOException;
Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
复制代码
具体的参数解释可以参考queueBind
,这里不再赘述。
exchangeBind
将交换器与交换器绑定
Exchange.BindOk exchangeBind(String destination,
String source, String routingKey);
Exchange.BindOk exchangeBind(String destination,
String source, String routingKey, Map<String, Object> arguments);
复制代码
- destination:目标交换机名称
- source:源交换机名称
- routingKey:路由key
- arguments:额外参数
实例:
public static void main(String[] args) throws IOException {
Connection conn = RabbitMQUtil.createConn();
Channel channel = conn.createChannel();
//声明source交换机
channel.exchangeDeclare ("source","direct", false , true , null) ;
//声明destination交换机
channel.exchangeDeclare ("destination","fanout", false , true , null);
//交换器与交换器绑定
channel.exchangeBind ("destination" , "source","exKey");
//声明队列
channel.queueDeclare ("queue", false, false , true , null);
//将队列和交换器绑定
channel.queueBind ("queue","destination","");
//发布消息
channel.basicPublish ("source","exKey", null,"我是发送的消息".getBytes () ) ;
}
复制代码
生产者发送消息至交换器 source 中,交换器 source 根据路由键找到与其匹配的另一个交换机 destination,并把消息转发到 destination 中,进而存储在 destination 绑定的队列 queue中。
exchangeUnbind
将交换器与交换器解除绑定
Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey);
Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey, Map<String, Object> arguments);
复制代码
具体的参数解释可以参考exchangeBind
,这里不再赘述。
发送消息
basicPublish
void basicPublish(String exchange, String routingKey,
BasicProperties props, byte[] body);
void basicPublish(String exchange, String routingKey,
boolean mandatory, BasicProperties props, byte[] body);
void basicPublish(String exchange, String routingKey,
boolean mandatory, boolean immediate, BasicProperties props, byte[] body);
复制代码
参数解释:
-
exchange:交换机的名称,指明消息需要发送到哪个交换机中,如果设置为空字符串,则消息会被发送到 RabbitMQ 默认的交换器中。
-
routingKey:路由键,交换机根据路由键将消息存储到相应的队列之中
-
props: 消息的基本属性集,其包含 14 个属性成员
public static class BasicProperties extends com.rabbitmq.client.impl.AMQBasicProperties {
private String contentType; //消息类型如(text/plain)
private String contentEncoding; //字符编码
private Map<String,Object> headers; //请求头信息
private Integer deliveryMode; //消息的投递模式
private Integer priority; //优先级
private String correlationId; //用来关联RPC的请求和响应。
private String replyTo; //一般用来命名一个回调queue。
private String expiration; //过期时间
private String messageId;//消息表示符 用于标示消息
private Date timestamp; //消息发送的时间戳
private String type; //消息类型
private String userId; //连接到mq的用户名
private String appId; //消息的应用程序的表示符 比如你的计算机名称
private String clusterId;
复制代码
-
byte[] body: 消息体( payload ),真正需要发送的消息
-
mandatory:当 mandatory 参数设为 true 时,交换器无法根据自身的类型和路由键找到一个符合条件的队列时,那么 RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者。当 mandatory 参数设置为 false 时,出现上述情形,则消息直接被丢弃。
-
immediate:当 immediate 参数设为 true 时,如果交换器在将消息路由到队列时发现队列上并不存在任何消费者,那么这条消息将不会存入队列中。当与路由键匹配的所有队列都没有消费者时,该消息会通过 Basic.Return 返回至生产者。
概括来说, mandatory 参数告诉服务器至少将该消息路由到一个队列中,否则将消息返回给生产者。immediate 参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递;如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了。
RabbitMQ 3.0 版本开始去掉了对 immediate 参数的支持,对此 RabbitMQ 官方解释是:
immediate 参数会影响镜像队列的性能,增加了代码复杂性,建议采用 TTL、DLX 的方法替代。
实例:
- 1、投递模式(delivery mode)设置为2 ,即消息会被持久化(存入磁盘)在服务器中。同时这条消息的优先级(priority)设置为 1, contentType
为 text/plain,userId为当前登录用户名。
channel.basicPublish("exchangeName" , "routingKey",
new AMQP.BasicProperties.Builder()
.contentType ("text/plain")
.deliveryMode(2)
.priority(1)
.userId("test")
.build(),
"发送消息".getBytes());
复制代码
- 2、发送一条带有 headers 的消息
Map<String, Object> headers = new HashMap<String, Object>() ;
headers.put("token","782");
channel.basicPublish("exchangeName", "routingKey",
new AMQP.BasicProperties.Builder()
.headers(headers)
.build(),
"发送消息".getBytes()) ;
复制代码
- 3、发送一条带有过期时间(expiration)的消息
channel.basicPublish("exchangeName", "routingKey",
new AMQP.BasicProperties.Builder()
. expiration ("6000")
.build(),
"发送消息".getBytes());
复制代码
消费消息
RabbitMQ 的消费消息模式分两种:推( Push )模式和拉( Pull )模式。推模式采用 Basic.Consume
进行消费,而拉模式则是调用 Basic.Get 进行消费。
推模式
Channel 类中 basicConsume 方法太多,这里介绍几个常用的
String basicConsume(String queue , Consumer callback);
String basicConsume(String queue , boolean autoAck, Consumer callback);
String basicConsume(String queue , boolean autoAck , Map<String , Object>
arguments, Consumer callback);
String basicConsume(String queue , boolean autoAck , String consumerTag,
Consumer callback);
String basicConsume(String queue , boolean autoAck , String consumerTag,
boolean noLocal , boolean exclusive , Map<Str ng Object> arguments,
Consumer callback);
复制代码
对应的参数说明:
- queue: 队列的名称
- autoAck: 设置是否自动确认,如果设置 autoAck 为 false ,那么需要显示调用 channel.basicAck 来确认消息己被成功接收。
- consumerTag:消费者标签,用来区分多个消费者。
- noLocal:设置为 true, 则表示不能将同一个 Connection 中生产者发送的消息传送给这个 Connection 中的消费者。
- exclusive:设置是否排他
- arguments:设置消费者的其他参数
- callback:设置消费者的回调函数。用来处理 RabbitMQ 推送过来的消息。比如DefaultConsumer,使用时需要客户端重写其中的方法。
channel.basicConsume("work",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费消息:" + new String(body));
/**
* 参数1:确认队列中那个消息被消费了
* 参数2:是否开启多个消息同时确认 true 开启
*/
//手动确认消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
复制代码
拉模式
通过 channel.basicGet
方法可以单条地获取消息,其返回值是 GetResponse
。Channel
类的 basicGet
方法没有其他重载方法。
GetResponse basicGet(String queue, boolean autoAck);
复制代码
其中 queue 代表队列的名称,如果设置 autoAck 为 false ,那么需要显示调用
channel.basicAck
来确认消息己被成功接收。
消费端的确认与拒绝
basicReject
void basicReject(long deliveryTag, boolean requeue);
复制代码
- deliveryTag: 消息的编号
- requeue: 参数设置为 true,则
RabbitMQ
会重新将这条消息存入
队列,以便可以发送给下一个订阅的消费者;如果 requeue
参数设置为 false,则 RabbitMQ
会立即把消息从队列中移除。
注:Basic.Reject命令一次只能拒绝一条消息,如果想要批量拒绝消息,则可以使用 Basic.Nack 这个命令。
basicNack
void basicNack(long deliveryTag, boolean multiple, boolean requeue);
复制代码
- deliveryTag:消息的编号
- multiple:设置为 false 时,则表示拒绝编号为
deliveryTag
的这一条消息,这时候basicNack
方法和basicReject
方法一样, multiple 参数设置为 true 则表示拒绝小于deliveryTag
编号之前所有未被当前消费者确认的消息。 - requeue:参数设置为 true,则
RabbitMQ
会重新将这条消息存入
队列,以便可以发送给下一个订阅的消费者;如果 requeue 参数设置为 false,则 RabbitMQ 会立即把消息从队列中移除。
basicRecover
是否恢复消息到队列。
Basic.RecoverOk basicRecover() throws IOException;
Basic.RecoverOk basicRecover(boolean requeue) throws IOException;
复制代码
这个 channel.basicRecover
方法用来请求 RabbitMQ
重新发送还未被确认的消息,requeue
参数设置为 true ,则未被确认的消息会被重新加入到队列中,并且尽可能的将消息投递给其他消费者进行消费,而不是自己再次消费。如果,requeue
参数设置为 false ,消息会被重新投递给自己。默认情况下,如果不设置 requeue
这个参数默认为 true。
basicAck
void basicAck(long deliveryTag, boolean multiple) throws IOException;
复制代码
- deliveryTag :消息的编号
- multiple:设置为 false 时,则表示确认消费编号为
deliveryTag
的这一条消息,该参数为 true 时,则可以一次性确认消费小于等于deliveryTag
值的所有消息。
参考文献:
1、朱忠华老师
《RabbitMQ实战指南》一书
- 如你对本文有疑问或本文有错误之处,欢迎评论留言指出。如觉得本文对你有所帮助,欢迎点赞和关注。