初入RabbitMQ(一)
前言
rabbitMq作为目前主流MQ(Message Queue)比较火的一个,很多初学者都不会使用MQ或者在MQ上做一些具体业务。这篇文章足够让小白明白MQ怎么使用的。
安装
环境
- SpringBoot 2.3.1.RELEASE
- Rabbit MQ
安装开始
这里笔者直接用Docker安装比较方便
docker pull rabbitmq
docker run -d --name rabbit -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=123456 -p 15672:15672 -p 5672:5672 rabbitmq
复制代码
访问后还需要进容器一次,才能进入rabbitmq后台
docker exec -it rabbitmq_id /bin/bash
rabbitmq-plugins enable rabbitmq_management
exit
复制代码
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
或(本文采用第二种,因为懒嘛不想写@Value)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
复制代码
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=123456
复制代码
/**
* @author Kakki
* @version 1.0
* @create 2021-06-18 16:46
*/
@Configuration
@AutoConfigureAfter
public class RabbitMqConfig {
@Autowired
private RabbitProperties rabbitProperties;
@Bean
public CachingConnectionFactory cachingConnectionFactory() {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(rabbitProperties.getHost());
connectionFactory.setPort(rabbitProperties.getPort());
connectionFactory.setUsername(rabbitProperties.getUsername());
connectionFactory.setPassword(rabbitProperties.getPassword());
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setTopologyRecoveryEnabled(true);
return new CachingConnectionFactory(connectionFactory);
}
}
@AutoConfigureAfter(RabbitMqConfig.class)
@Configuration
public class QueueAutoConfig implements InitializingBean {
@Autowired
private AmqpAdmin amqpAdmin;
@Override
public void afterPropertiesSet() throws Exception {
Arrays.stream(QueueInfo.values()).forEach(v->{
Queue queue = new Queue(v.getQueue());
amqpAdmin.declareQueue(queue);
CustomExchange exchange = new CustomExchange(v.getExchange(), v.getExchangeType());
amqpAdmin.declareExchange(exchange);
Binding binding = BindingBuilder.bind(queue).to(exchange).with(v.getRoutingKey()).noargs();
amqpAdmin.declareBinding(binding);
});
}
}
@AllArgsConstructor
@Getter
public enum QueueInfo {
FIRST(
"FIRST_QUEUE",
"FIRST_EXCHANGE",
"FIRST_ROUTING_KEY",
"fanout"
);
private final String queue;
private final String exchange;
private final String routingKey;
private final String exchangeType;
}
复制代码
如何发送消息和接受消息呢
发送消息
/**
* 单元测试
*/
@Test
public void sendMq() {
String s = "Hello Kakki";
rabbitTemplate.convertAndSend(QueueInfo.FIRST.getQueue(), s);
while(true);
}
复制代码
接受消息
/**
* @author Kakki
* @version 1.0
* @create 2021-06-18 17:13
*/
@Component
@Slf4j
public class MqCustomer {
@RabbitListener(queues = {"FIRST_QUEUE"})
public void handler(Message message) { // 这里Message也可以直接使用String去接受
log.info("[消息来了]:{}", new String(message.getBody(), StandardCharsets.UTF_8));
}
}
复制代码
使用时看参数
- 上面使用的是convertAndSend()这个方法,这个方法有很多参数,可以发现的是,我们的参数是RoutingKey的时候可以直接发到监听的队列,为什么呢?上面的参数明明是routingKey啊!这个我们后面再说。
使用不同的交换机类型发送信息
/**
* 单元测试
*/
@Test
public void sendMq() {
rabbitTemplate.convertAndSend(QueueInfo.FANOUT.getExchange(), StringUtils.EMPTY, "fanout 无routingKey");
rabbitTemplate.convertAndSend(QueueInfo.FANOUT.getExchange(), QueueInfo.FANOUT.getRoutingKey(), "fanout 无routingKey");
rabbitTemplate.convertAndSend(QueueInfo.DIRECT.getExchange(), QueueInfo.DIRECT.getRoutingKey(), "DIRECT 有routingKey");
rabbitTemplate.convertAndSend(QueueInfo.TOPIC1.getExchange(), QueueInfo.TOPIC1.getRoutingKey(), String.format("TOPIC1 有routingKey%s", QueueInfo.TOPIC1.getRoutingKey()));
rabbitTemplate.convertAndSend(QueueInfo.TOPIC2.getExchange(), QueueInfo.TOPIC2.getRoutingKey(), String.format("TOPIC2 有routingKey%s", QueueInfo.TOPIC2.getRoutingKey()));
rabbitTemplate.convertAndSend(QueueInfo.TOPIC3.getExchange(), QueueInfo.TOPIC3.getRoutingKey(), String.format("TOPIC3 有routingKey%s", QueueInfo.TOPIC3.getRoutingKey()));
rabbitTemplate.convertAndSend(QueueInfo.TOPIC4.getExchange(), QueueInfo.TOPIC4.getRoutingKey(), String.format("TOPIC4 有routingKey%s", QueueInfo.TOPIC4.getRoutingKey()));
rabbitTemplate.convertAndSend(QueueInfo.HEADERS.getExchange(), QueueInfo.HEADERS.getRoutingKey(), String.format("HEADERS 有routingKey%s", QueueInfo.HEADERS.getRoutingKey()));
while (true);
}
/**
* @author Kakki
* @version 1.0
* @create 2021-06-18 17:13
*/
@Component
@Slf4j
public class MqCustomer {
@RabbitListener(queues = {"FANOUT_QUEUE"})
public void handlerF(Message message) {
log.info("[消息来了]:{}", new String(message.getBody(), StandardCharsets.UTF_8));
}
@RabbitListener(queues = {"DIRECT_QUEUE"})
public void handlerD(Message message) {
log.info("[消息来了]:{}", new String(message.getBody(), StandardCharsets.UTF_8));
}
@RabbitListener(queues = {"TOPIC_QUEUE"})
public void handlerT(Message message) {
log.info("[消息来了]:{}", new String(message.getBody(), StandardCharsets.UTF_8));
}
@RabbitListener(queues = {"HEADERS_QUEUE"})
public void handlerH(Message message) {
log.info("[消息来了]:{}", new String(message.getBody(), StandardCharsets.UTF_8));
}
}
复制代码
2021-06-30 16:48:14.638 INFO 16460 --- [ntContainer#0-1] com.example.demo.config.MqCustomer : [消息来了]:DIRECT 有routingKey
2021-06-30 16:48:14.638 INFO 16460 --- [ntContainer#1-1] com.example.demo.config.MqCustomer : [消息来了]:fanout 无routingKey
2021-06-30 16:48:14.638 INFO 16460 --- [ntContainer#3-1] com.example.demo.config.MqCustomer : [消息来了]:HEADERS 有routingKeyHEADERS_ROUTING_KEY
2021-06-30 16:48:14.638 INFO 16460 --- [ntContainer#2-1] com.example.demo.config.MqCustomer : [消息来了]:TOPIC1 有routingKeyTOPIC_ROUTING_KEY.#
2021-06-30 16:48:14.638 INFO 16460 --- [ntContainer#1-1] com.example.demo.config.MqCustomer : [消息来了]:fanout 无routingKey
2021-06-30 16:48:14.638 INFO 16460 --- [ntContainer#2-1] com.example.demo.config.MqCustomer : [消息来了]:TOPIC2 有routingKeyTOPIC_ROUTING_KEY.AAAB
2021-06-30 16:48:14.639 INFO 16460 --- [ntContainer#2-1] com.example.demo.config.MqCustomer : [消息来了]:TOPIC3 有routingKeyTOPIC_ROUTING_KEY.AAAC.AAAK
2021-06-30 16:48:14.639 INFO 16460 --- [ntContainer#2-1] com.example.demo.config.MqCustomer : [消息来了]:TOPIC4 有routingKeyNO_KEY
复制代码
- 由此可见发送的消息,发到不同类型的交换机上面,可能有不同的效果,具体到底应该怎么配置这些交换机类型,或者说交换机应该怎么选择,都是靠业务场景去选择。
- 笔者这里有个错误,就是发fanout交换机的时候,其实并没有使用fanout类型的交换机去发送,那为什么还会到达呢?这个留给大家思考思考。我们下一章再细说。
小结
使用MQ的情况当然是为了业务解耦,在高并发场景下也能削峰,熟练用MQ会有很多意想不到的用途,当然这篇文章是给一开始学习的小白用的,RabbitMQ就没内容了吗?当然不是。本来想把所有我知道的写下来,但是怕篇幅太长,被人不喜欢(其实就是懒)所以在下一个关于MQ内容上说!
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END