SpringtBoot整合RabbitMQ

这是我参与8月更文挑战的第16天,活动详情查看:8月更文挑战

先导知识

RabbitTemplate

在之前我们的入门程序中,我们与RabbiMQ取得连接以及通过RabbitMQ进行消息的发送和接收都是通过amqp-client使用RabbitMQ原生API的方式来与RabbitMQ取得连接、声明交换机、发送消息等,那么如果我想在Spring中向 RabbitMQ Server 发送消息,该怎么办呢?这时我们就需要使用RabbitTemplate

✨RabbitTemplate 是 Spring-AMQP依赖为我们提供的一种 RabbitMQ 消息模板,它与 RabbitMQ Server 建立了一种映射关系,我们只需要对RabbitTemplate进行配置,就可以将我们应用程序中的数据发送到 RabbitMQ Server 中。

RabbitTemplate 提供了编辑消息、发送消息、发送消息前的监听、发送消息后的监听回调接口ConfirmCallback、返回确认接口 ReturnCallback 等等功能,我们只需要配置一下RabbitTemplate然后将其实例化后存放在Spring容器中,我们就可以使用它来完成各种操作。

SpringtBoot整合RabbitMQ

  1. 创建一个SpringBoot项目,引入 rabbitmq 依赖包
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
复制代码
  1. 配置application.yml配置文件
spring:
  #RabbitMq配置
  rabbitmq:
        host: 192.168.180.130
        port: 5672
        username: admin
        password: admin
        virtual-host: /
        #消息确认回调
        publisher-confirm-type: correlated
        #消息失败回调
        publisher-returns: true
        #开启手动确认
        listener:
             simple:
                     acknowledge-mode: manual
复制代码
  1. 自定义消息确认回调和失败回调
  • 消息确认回调,即消息到达Broker后,会调用该回调函数
@Component
public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String s) {
        if(ack){
            System.out.println("成功投递");
        }else{
            System.out.println("投递失败");
        }
    }
}
复制代码
  • 消息失败回调,如当交换机无法将消息路由到指定队列时会调用该回调函数
@Component
public class MyReturnCallback implements RabbitTemplate.ReturnCallback {

    @Override
    public void returnedMessage(Message message, int i, String s, String s1, String s2) {
        System.out.println("消息投递失败回调");
    }
}
复制代码
  1. 新建一个配置类RabbitMQConfig,定义交换机Exchange和队列Queue,并将队列绑定在交换机上,配置RabbitTemplate消息模板。
@Configuration
public class RabbitMQConfig {

    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQConfig.class);

    @Resource
    private MyConfirmCallback confirmCallback;

    @Resource
    private MyReturnCallback returnCallback;


    //连接工厂
    @Autowired
    private CachingConnectionFactory cachingConnectionFactory;

    
    //配置RabbitTemplate消息模板
    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
        //消息投递成功回调
        rabbitTemplate.setConfirmCallback(confirmCallback);
        //消息投递失败回调
        rabbitTemplate.setReturnCallback(returnCallback);
        return rabbitTemplate;
    }

    /**
     * 声明队列
     * @return
     */
    @Bean
    public Queue queue() {
        return new Queue("queue4");
    }

    //声明交换机
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("Direct");
    }

    //绑定交换机的和队列
    @Bean
    public Binding bingding() {
        return BindingBuilder.
                bind(queue()).
                to(directExchange()).
                with("springboot");
    }

}
复制代码
  1. 生产者发送消息
@RunWith(SpringRunner.class)
@SpringBootTest
class RabbitmqApplicationTests {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    void contextLoads() {
        //发送消息
        rabbitTemplate.convertAndSend("Direct","springboot","不喝奶茶的Programmer");
    }


}
复制代码
  1. 消费者监听队列,消费消息
@Component
public class Comsummer {

    @RabbitListener(queues = "queue4")
    public void receiverMessage(Object msg, Channel channel,Message message) throws IOException {
        System.out.println("收到消息:"+msg);
        //手动ACK
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

    }
}
复制代码

扩展: 采用非配置类的方式来进行交换机、队列声明以及交换机和队列的绑定注解的方式)( 在实际的生产环境中,在生产者端进行绑定或者在消费者端进行绑定都可以,但是在消费者端进行绑定会更好,因为我们消费者是最先启动起来的服务,如果当队列没有被声明,消费者端会出现启动错误,所以当消费者启动时就进行交换机和队列的绑定更ok )

如,通过以下注解的方式一样可以完成交换机、队列的声明以及交换机和队列的绑定:

Component
@RabbitListener(bindings = @QueueBinding(
        value=@Queue(value = "queue4",durable = "false",autoDelete = "false"),
        exchange=@Exchange(value = "Direct",type = ExchangeTypes.DIRECT),
        key = "springboot"
))
public class Comsummer {

    @RabbitHandler
    public void receiverMessage(Object msg, Channel channel,Message message) throws IOException {
        System.out.println("收到消息:"+msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

    }
}
复制代码

?以上就是对Spring Boot整合RabbitMQ过程的简单介绍,如果有错误的地方,还请留言指正,如果觉得本文对你有帮助那就点个赞?吧???
默认标题_动态分割线_2021-07-15-0.gif
k3u1fbpfcp-watermark.image)

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享