这是我参与更文挑战的第6,活动详情查看: 更文挑战
我们之前的rabbitMQ的模式都是不存在交换机的,直接发送到队列,将下来讲的是订阅模型,一次像多个消费者发消息
一个生产者发送消息到交换机,交换机发给绑定在自己上边的队列,消费者在从队列拿到消息消费,
X(Exchange):交换机接受生产者发送的消息,另一方面知道如何处理消息,,发给某个队列,还是发给所有的队列,或者是直接舍弃,取决于交换机是如何配置的,交换机只负责发送,而不去存储消息。
交换机分为几类
Publish/Subscribe:广播,将消息交给所有绑定到交换机的队列
Routing:定向,把消息交给符合指定routing key 的队列
Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
复制代码
订阅模型–Publish/Subscribe
在广播模式下,消息发送流程是这样的:
- 1) 可以有多个消费者
- 2) 每个消费者有自己的queue(队列)
- 3) 每个队列都要绑定到Exchange(交换机)
- 4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
- 5) 交换机把消息发送给绑定过的所有队列
- 6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费
生产者
生产者声明交换机,不在声明队列,消息发送到交换机,比在发送到队列
public class p1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2.设置参数
connectionFactory.setHost("192.168.145.3");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/zhaojin");
connectionFactory.setUsername("zhaojin");
connectionFactory.setPassword("zhaojin");
//3.创建连接
Connection connection = connectionFactory.newConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明exchange,指定类型为fanout
channel.exchangeDeclare("Subscribe_exchange", "fanout");
// 消息内容
String message = "Hello_Subscribe";
// 发布消息到Exchange
channel.basicPublish("Subscribe_exchange", "", null, message.getBytes());
System.out.println("生产者发送消息=:'" + message + "'");
channel.close();
connection.close();
}
}
复制代码
消费者1
public class c1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2.设置参数
connectionFactory.setHost("192.168.145.3");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/zhaojin");
connectionFactory.setUsername("zhaojin");
connectionFactory.setPassword("zhaojin");
//3.创建连接
Connection connection = connectionFactory.newConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("Subscribe_queue_1", false, false, false, null);
// 绑定队列到交换机
channel.queueBind("Subscribe_queue_1", "Subscribe_exchange", "");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("c1消费消息: "+new String(body));
//手动ack
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// 监听队列,自动返回完成
channel.basicConsume("Subscribe_queue_1", false, consumer);
}
}
复制代码
消费者2
public class c2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2.设置参数
connectionFactory.setHost("192.168.145.3");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/zhaojin");
connectionFactory.setUsername("zhaojin");
connectionFactory.setPassword("zhaojin");
//3.创建连接
Connection connection = connectionFactory.newConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("Subscribe_queue_2", false, false, false, null);
// 绑定队列到交换机
channel.queueBind("Subscribe_queue_2", "Subscribe_exchange", "");
// 定义队列的消费者
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("c2消费消息: "+new String(body));
//手动ack
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// 监听队列,自动返回完成
channel.basicConsume("Subscribe_queue_2", false, consumer);
}
}
复制代码
启东消费者,生产者发送一条消息,看输出
Routing–有选择的发送消息
订阅模式,在这个中我们可以做到不同的队列接受不同的消息,队列与交换机绑定必须指定,消息发送时也必须指定发送消息的routingKey
如上图所示生产者生产消息发送到交换机,交换机通过与rontingkley的匹配的队列发送消息。
生产者–分别发送三次不同的消息,匹配不同的routingkey
public class p {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2.设置参数
connectionFactory.setHost("192.168.145.3");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/zhaojin");
connectionFactory.setUsername("zhaojin");
connectionFactory.setPassword("zhaojin");
//3.创建连接
Connection connection = connectionFactory.newConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明exchange,指定类型为fanout
channel.exchangeDeclare("routing_exchange", "direct");
// 消息内容
//String message = "新增";
//String message = "删除";
String message = "更新";
// 发布消息到Exchange
//channel.basicPublish("routing_exchange", "insert", null, message.getBytes());
//channel.basicPublish("routing_exchange", "delect", null, message.getBytes());
channel.basicPublish("routing_exchange", "update", null, message.getBytes());
System.out.println("生产者发送消息=:开始'" + message + "'");
channel.close();
connection.close();
}
}
复制代码
消费者insert
public class insert {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2.设置参数
connectionFactory.setHost("192.168.145.3");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/zhaojin");
connectionFactory.setUsername("zhaojin");
connectionFactory.setPassword("zhaojin");
//3.创建连接
Connection connection = connectionFactory.newConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("routing_queue_insert", false, false, false, null);
// 绑定队列到交换机
channel.queueBind("routing_queue_insert", "routing_exchange", "insert");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("insert 接收消息 : "+new String(body));
//手动ack
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// 监听队列,自动返回完成
channel.basicConsume("routing_queue_insert", false, consumer);
}
}
复制代码
消费者delect
public class delect {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2.设置参数
connectionFactory.setHost("192.168.145.3");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/zhaojin");
connectionFactory.setUsername("zhaojin");
connectionFactory.setPassword("zhaojin");
//3.创建连接
Connection connection = connectionFactory.newConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("routing_queue_delect", false, false, false, null);
// 绑定队列到交换机
channel.queueBind("routing_queue_delect", "routing_exchange", "delect");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("delect 接收消息 : "+new String(body));
//手动ack
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// 监听队列,自动返回完成
channel.basicConsume("routing_queue_delect", false, consumer);
}
}
复制代码
消费者update
public class update {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2.设置参数
connectionFactory.setHost("192.168.145.3");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/zhaojin");
connectionFactory.setUsername("zhaojin");
connectionFactory.setPassword("zhaojin");
//3.创建连接
Connection connection = connectionFactory.newConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("routing_queue_update", false, false, false, null);
// 绑定队列到交换机
channel.queueBind("routing_queue_update", "routing_exchange", "update");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("delect 接收消息 : "+new String(body));
//手动ack
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// 监听队列,自动返回完成
channel.basicConsume("routing_queue_update", false, consumer);
}
}
复制代码
看控制台输出,可以看到,绑定了不同routingkey的收到不同的消息,每个队列可以有很多个routingkey
topic–
不同于Direct
的交换机,topic匹配可以通过通配符
Routingkey
一般都是有一个或多个单词组成,多个单词之间以”.”分割
*(星号)可以正好代替一个词。
# (hash) 可以代替零个或多个单词。
复制代码
生产者
public class p {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2.设置参数
connectionFactory.setHost("192.168.145.3");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/zhaojin");
connectionFactory.setUsername("zhaojin");
connectionFactory.setPassword("zhaojin");
//3.创建连接
Connection connection = connectionFactory.newConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明exchange,指定类型为fanout
channel.exchangeDeclare("topic_exchange", "topic");
// 消息内容
String message = "新增";
//String message = "删除";
//String message = "更新";
// 发布消息到Exchange
channel.basicPublish("topic_exchange", "goods.insert", null, message.getBytes());
//channel.basicPublish("topic_exchange", "goods.delect", null, message.getBytes());
// channel.basicPublish("topic_exchange", "goods.update", null, message.getBytes());
System.out.println("生产者发送消息=:开始'" + message + "'");
channel.close();
connection.close();
}
}
复制代码
消费者1 只接收insert和delect
public class c1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2.设置参数
connectionFactory.setHost("192.168.145.3");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/zhaojin");
connectionFactory.setUsername("zhaojin");
connectionFactory.setPassword("zhaojin");
//3.创建连接
Connection connection = connectionFactory.newConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("topic_queue_1", false, false, false, null);
// 绑定队列到交换机
channel.queueBind("topic_queue_1", "topic_exchange", "goods.insert");
channel.queueBind("topic_queue_1", "topic_exchange", "goods.delect");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("新增删除 接收消息 : "+new String(body));
//手动ack
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// 监听队列,自动返回完成
channel.basicConsume("topic_queue_1", false, consumer);
}
}
复制代码
消费者2 只要匹配到 goods.全拿下
public class c2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2.设置参数
connectionFactory.setHost("192.168.145.3");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/zhaojin");
connectionFactory.setUsername("zhaojin");
connectionFactory.setPassword("zhaojin");
//3.创建连接
Connection connection = connectionFactory.newConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("topic_queue_2", false, false, false, null);
// 绑定队列到交换机
channel.queueBind("topic_queue_2", "topic_exchange", "goods.*");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("商品 接收消息 : "+new String(body));
//手动ack
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// 监听队列,自动返回完成
channel.basicConsume("topic_queue_2", false, consumer);
}
}
复制代码
运行发送推广三条消息,看控制台输出
完美、
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END