这是我参与8月更文挑战的第16天,活动详情查看:8月更文挑战
先导知识
RabbitTemplate
在之前我们的入门程序中,我们与RabbiMQ取得连接以及通过RabbitMQ进行消息的发送和接收都是通过amqp-client
使用RabbitMQ原生API的方式来与RabbitMQ取得连接、声明交换机、发送消息等,那么如果我想在Spring中向 RabbitMQ Server 发送消息,该怎么办呢?这时我们就需要使用RabbitTemplate
。
✨RabbitTemplate 是 Spring-AMQP依赖为我们提供的一种 RabbitMQ 消息模板,它与 RabbitMQ Server 建立了一种映射关系,我们只需要对RabbitTemplate进行配置,就可以将我们应用程序中的数据发送到 RabbitMQ Server 中。
RabbitTemplate 提供了编辑消息、发送消息、发送消息前的监听、发送消息后的监听回调接口ConfirmCallback、返回确认接口 ReturnCallback 等等功能,我们只需要配置一下RabbitTemplate然后将其实例化后存放在Spring容器中,我们就可以使用它来完成各种操作。
SpringtBoot整合RabbitMQ
- 创建一个SpringBoot项目,引入 rabbitmq 依赖包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
复制代码
- 配置
application.yml
配置文件
spring:
#RabbitMq配置
rabbitmq:
host: 192.168.180.130
port: 5672
username: admin
password: admin
virtual-host: /
#消息确认回调
publisher-confirm-type: correlated
#消息失败回调
publisher-returns: true
#开启手动确认
listener:
simple:
acknowledge-mode: manual
复制代码
- 自定义消息确认回调和失败回调
- 消息确认回调,即消息到达Broker后,会调用该回调函数
@Component
public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String s) {
if(ack){
System.out.println("成功投递");
}else{
System.out.println("投递失败");
}
}
}
复制代码
- 消息失败回调,如当交换机无法将消息路由到指定队列时会调用该回调函数
@Component
public class MyReturnCallback implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
System.out.println("消息投递失败回调");
}
}
复制代码
- 新建一个配置类RabbitMQConfig,定义交换机Exchange和队列Queue,并将队列绑定在交换机上,配置RabbitTemplate消息模板。
@Configuration
public class RabbitMQConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQConfig.class);
@Resource
private MyConfirmCallback confirmCallback;
@Resource
private MyReturnCallback returnCallback;
//连接工厂
@Autowired
private CachingConnectionFactory cachingConnectionFactory;
//配置RabbitTemplate消息模板
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
//消息投递成功回调
rabbitTemplate.setConfirmCallback(confirmCallback);
//消息投递失败回调
rabbitTemplate.setReturnCallback(returnCallback);
return rabbitTemplate;
}
/**
* 声明队列
* @return
*/
@Bean
public Queue queue() {
return new Queue("queue4");
}
//声明交换机
@Bean
public DirectExchange directExchange() {
return new DirectExchange("Direct");
}
//绑定交换机的和队列
@Bean
public Binding bingding() {
return BindingBuilder.
bind(queue()).
to(directExchange()).
with("springboot");
}
}
复制代码
- 生产者发送消息
@RunWith(SpringRunner.class)
@SpringBootTest
class RabbitmqApplicationTests {
@Resource
private RabbitTemplate rabbitTemplate;
@Test
void contextLoads() {
//发送消息
rabbitTemplate.convertAndSend("Direct","springboot","不喝奶茶的Programmer");
}
}
复制代码
- 消费者监听队列,消费消息
@Component
public class Comsummer {
@RabbitListener(queues = "queue4")
public void receiverMessage(Object msg, Channel channel,Message message) throws IOException {
System.out.println("收到消息:"+msg);
//手动ACK
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
复制代码
扩展: 采用非配置类的方式来进行交换机、队列声明以及交换机和队列的绑定
(注解的方式)( 在实际的生产环境中,在生产者端进行绑定或者在消费者端进行绑定都可以,但是在消费者端进行绑定会更好,因为我们消费者是最先启动起来的服务,如果当队列没有被声明,消费者端会出现启动错误,所以当消费者启动时就进行交换机和队列的绑定更ok )
如,通过以下注解的方式一样可以完成交换机、队列的声明以及交换机和队列的绑定:
Component
@RabbitListener(bindings = @QueueBinding(
value=@Queue(value = "queue4",durable = "false",autoDelete = "false"),
exchange=@Exchange(value = "Direct",type = ExchangeTypes.DIRECT),
key = "springboot"
))
public class Comsummer {
@RabbitHandler
public void receiverMessage(Object msg, Channel channel,Message message) throws IOException {
System.out.println("收到消息:"+msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
复制代码
?以上就是对Spring Boot整合RabbitMQ过程的简单介绍,如果有错误的地方,还请留言指正,如果觉得本文对你有帮助那就点个赞?吧???
k3u1fbpfcp-watermark.image)