初入RabbitMQ(一)

初入RabbitMQ(一)

前言

rabbitMq作为目前主流MQ(Message Queue)比较火的一个,很多初学者都不会使用MQ或者在MQ上做一些具体业务。这篇文章足够让小白明白MQ怎么使用的。

安装

环境

  • SpringBoot 2.3.1.RELEASE
  • Rabbit MQ

安装开始

这里笔者直接用Docker安装比较方便

docker pull rabbitmq
docker run -d --name rabbit -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=123456 -p 15672:15672 -p 5672:5672 rabbitmq
复制代码

访问后还需要进容器一次,才能进入rabbitmq后台

docker exec -it rabbitmq_id /bin/bash
rabbitmq-plugins enable rabbitmq_management
exit
复制代码
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
</dependency>
或(本文采用第二种,因为懒嘛不想写@Value)
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
复制代码
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=123456
复制代码
/**
 * @author Kakki
 * @version 1.0
 * @create 2021-06-18 16:46
 */
@Configuration
@AutoConfigureAfter
public class RabbitMqConfig {

    @Autowired
    private RabbitProperties rabbitProperties;

    @Bean
    public CachingConnectionFactory cachingConnectionFactory() {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(rabbitProperties.getHost());
        connectionFactory.setPort(rabbitProperties.getPort());
        connectionFactory.setUsername(rabbitProperties.getUsername());
        connectionFactory.setPassword(rabbitProperties.getPassword());
        connectionFactory.setAutomaticRecoveryEnabled(true);
        connectionFactory.setTopologyRecoveryEnabled(true);
        return new CachingConnectionFactory(connectionFactory);
    }
}
@AutoConfigureAfter(RabbitMqConfig.class)
@Configuration
public class QueueAutoConfig implements InitializingBean {

    @Autowired
    private AmqpAdmin amqpAdmin;

    @Override
    public void afterPropertiesSet() throws Exception {
        Arrays.stream(QueueInfo.values()).forEach(v->{
            Queue queue = new Queue(v.getQueue());
            amqpAdmin.declareQueue(queue);
            CustomExchange exchange = new CustomExchange(v.getExchange(), v.getExchangeType());
            amqpAdmin.declareExchange(exchange);
            Binding binding = BindingBuilder.bind(queue).to(exchange).with(v.getRoutingKey()).noargs();
            amqpAdmin.declareBinding(binding);
        });
    }
}
@AllArgsConstructor
@Getter
public enum QueueInfo {
    FIRST(
            "FIRST_QUEUE",
            "FIRST_EXCHANGE",
            "FIRST_ROUTING_KEY",
            "fanout"
    );

    private final String queue;
    private final String exchange;
    private final String routingKey;
    private final String exchangeType;

}
复制代码

如何发送消息和接受消息呢

发送消息

/**
  * 单元测试
  */
@Test
public void sendMq() {
    String s = "Hello Kakki";
    rabbitTemplate.convertAndSend(QueueInfo.FIRST.getQueue(), s);
    while(true);
}
复制代码

接受消息

/**
 * @author Kakki
 * @version 1.0
 * @create 2021-06-18 17:13
 */
@Component
@Slf4j
public class MqCustomer {
    @RabbitListener(queues = {"FIRST_QUEUE"})
    public void handler(Message message) { // 这里Message也可以直接使用String去接受
        log.info("[消息来了]:{}", new String(message.getBody(), StandardCharsets.UTF_8));
    }
}
复制代码

使用时看参数

  • 上面使用的是convertAndSend()这个方法,这个方法有很多参数,可以发现的是,我们的参数是RoutingKey的时候可以直接发到监听的队列,为什么呢?上面的参数明明是routingKey啊!这个我们后面再说。

使用不同的交换机类型发送信息

/**
 * 单元测试
 */
@Test
public void sendMq() {
    rabbitTemplate.convertAndSend(QueueInfo.FANOUT.getExchange(), StringUtils.EMPTY, "fanout 无routingKey");
    rabbitTemplate.convertAndSend(QueueInfo.FANOUT.getExchange(), QueueInfo.FANOUT.getRoutingKey(), "fanout 无routingKey");
    rabbitTemplate.convertAndSend(QueueInfo.DIRECT.getExchange(), QueueInfo.DIRECT.getRoutingKey(), "DIRECT 有routingKey");
    rabbitTemplate.convertAndSend(QueueInfo.TOPIC1.getExchange(), QueueInfo.TOPIC1.getRoutingKey(), String.format("TOPIC1 有routingKey%s", QueueInfo.TOPIC1.getRoutingKey()));
    rabbitTemplate.convertAndSend(QueueInfo.TOPIC2.getExchange(), QueueInfo.TOPIC2.getRoutingKey(), String.format("TOPIC2 有routingKey%s", QueueInfo.TOPIC2.getRoutingKey()));
    rabbitTemplate.convertAndSend(QueueInfo.TOPIC3.getExchange(), QueueInfo.TOPIC3.getRoutingKey(), String.format("TOPIC3 有routingKey%s", QueueInfo.TOPIC3.getRoutingKey()));
    rabbitTemplate.convertAndSend(QueueInfo.TOPIC4.getExchange(), QueueInfo.TOPIC4.getRoutingKey(), String.format("TOPIC4 有routingKey%s", QueueInfo.TOPIC4.getRoutingKey()));
    rabbitTemplate.convertAndSend(QueueInfo.HEADERS.getExchange(), QueueInfo.HEADERS.getRoutingKey(), String.format("HEADERS 有routingKey%s", QueueInfo.HEADERS.getRoutingKey()));

    while (true);
}
/**
 * @author Kakki
 * @version 1.0
 * @create 2021-06-18 17:13
 */
@Component
@Slf4j
public class MqCustomer {

    @RabbitListener(queues = {"FANOUT_QUEUE"})
    public void handlerF(Message message) {
        log.info("[消息来了]:{}", new String(message.getBody(), StandardCharsets.UTF_8));
    }

    @RabbitListener(queues = {"DIRECT_QUEUE"})
    public void handlerD(Message message) {
        log.info("[消息来了]:{}", new String(message.getBody(), StandardCharsets.UTF_8));
    }

    @RabbitListener(queues = {"TOPIC_QUEUE"})
    public void handlerT(Message message) {
        log.info("[消息来了]:{}", new String(message.getBody(), StandardCharsets.UTF_8));
    }

    @RabbitListener(queues = {"HEADERS_QUEUE"})
    public void handlerH(Message message) {
        log.info("[消息来了]:{}", new String(message.getBody(), StandardCharsets.UTF_8));
    }
}
复制代码
2021-06-30 16:48:14.638  INFO 16460 --- [ntContainer#0-1] com.example.demo.config.MqCustomer       : [消息来了]:DIRECT 有routingKey
2021-06-30 16:48:14.638  INFO 16460 --- [ntContainer#1-1] com.example.demo.config.MqCustomer       : [消息来了]:fanout 无routingKey
2021-06-30 16:48:14.638  INFO 16460 --- [ntContainer#3-1] com.example.demo.config.MqCustomer       : [消息来了]:HEADERS 有routingKeyHEADERS_ROUTING_KEY
2021-06-30 16:48:14.638  INFO 16460 --- [ntContainer#2-1] com.example.demo.config.MqCustomer       : [消息来了]:TOPIC1 有routingKeyTOPIC_ROUTING_KEY.#
2021-06-30 16:48:14.638  INFO 16460 --- [ntContainer#1-1] com.example.demo.config.MqCustomer       : [消息来了]:fanout 无routingKey
2021-06-30 16:48:14.638  INFO 16460 --- [ntContainer#2-1] com.example.demo.config.MqCustomer       : [消息来了]:TOPIC2 有routingKeyTOPIC_ROUTING_KEY.AAAB
2021-06-30 16:48:14.639  INFO 16460 --- [ntContainer#2-1] com.example.demo.config.MqCustomer       : [消息来了]:TOPIC3 有routingKeyTOPIC_ROUTING_KEY.AAAC.AAAK
2021-06-30 16:48:14.639  INFO 16460 --- [ntContainer#2-1] com.example.demo.config.MqCustomer       : [消息来了]:TOPIC4 有routingKeyNO_KEY
复制代码
  • 由此可见发送的消息,发到不同类型的交换机上面,可能有不同的效果,具体到底应该怎么配置这些交换机类型,或者说交换机应该怎么选择,都是靠业务场景去选择。
  • 笔者这里有个错误,就是发fanout交换机的时候,其实并没有使用fanout类型的交换机去发送,那为什么还会到达呢?这个留给大家思考思考。我们下一章再细说。

小结

使用MQ的情况当然是为了业务解耦,在高并发场景下也能削峰,熟练用MQ会有很多意想不到的用途,当然这篇文章是给一开始学习的小白用的,RabbitMQ就没内容了吗?当然不是。本来想把所有我知道的写下来,但是怕篇幅太长,被人不喜欢(其实就是懒)所以在下一个关于MQ内容上说!

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享