消息队列
- 解耦
- 异步
- 削峰
Docker部署RabbitMQ
#下载rabbitmq镜像
docker pull rabbitmq:3.7.7-management
#创建rabbitmq容器,可视化界面为15672端口,默认用户名密码为guest
docker run -d --name myRabbitMQ -p 5672:5672 -p 15672:15672 rabbitmq:3.7.7-management
复制代码
RabbitMQ组成
-
生产者
-
消费者
-
交换机
-
虚拟主机
-
队列
搭建简单模式
//开启关闭连接工具类
public class ConnectionUtil {
public static Connection openConnection() {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("zhangsan");
factory.setPassword("333");
factory.setVirtualHost("/");
Connection connection = factory.newConnection();
return connection;
}
public static void closeConnection(Connection connection, Channel channel) {
connection.close();
channel.close();
}
}
//新建队列和交换机
{
Connection connection = ConnectionUtil.openConnection();
Channel channel = connection.createChannel();
String queueName = "myQueue";
String exchangeType = "direct";
//创建队列,队列名称,是否持久化,是否为该channel独有,是否自动删除队列,携带附属参数
channel.queueDeclare(queueName, false, false, false, null);
//创建交换机,交换机是否持久化
channel.exchangeDeclare(exchangeName, exchangeType, true);
//绑定队列和交换机
channel.queueBind(queueName, exchangeName, routingKey);
//关闭连接
ConnectionUtil.closeConnection(connection, channel);
}
//生产者搭建
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.openConnection();
Channel channel = connection.createChannel();
String exchangeName = "myExchange";
String routingKey = "myRoutingKey";
String msg = "Hello World";
//生产
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
//关闭连接
ConnectionUtil.closeConnection(connection, channel);
}
}
//消费者搭建
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.openConnection();
Channel channel = connection.createChannel();
String queueName = "myQueue";
//消费
channel.basicConsume(queueName, true, new DeliverCallback() {
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("收到消息" + new String(delivery.getBody(), "UTF-8"));
}
}, new CancelCallback() {
public void handle(String s) throws IOException {
System.out.println("接收消息失败");
}
});
System.out.println("接收中。。。");
System.in.read();
//关闭连接
ConnectionUtil.closeConnection(connection, channel);
}
}
复制代码
消费模式
平均方式
//将autoAck设置为true
channel.basicConsumer("queue", true, deliverCallback);
复制代码
公平方式
//设置一次只能消费一个消息
channel.basicQos(1);
//将autoAck设置为false
channel.basicConsumer("queue", false, deliverCallback);
//消息消费完在回调函数中确认消费
channel.basicAck(delivery.getEnvelop().getDeliveryTag(), false);
复制代码
生产模式
Fanout
//交换机将消息发送到与之绑定的所有队列
channel.basicPublish(exchangeName, "", null, msg.getBytes());
复制代码
Direct
//交换机将消息发送到与routingkey绑定的队列
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
复制代码
Topic
//交换机绑定时设置模糊routingkey,#表示n个词,*表示1个词
channel.queueBind(queueName, exchangeName, routingKey);
//交换机将消息发送到模糊匹配的队列中
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
复制代码
整合SpringBoot
#配置application.yaml
server:
port:
8080
spring:
rabbitmq:
username: zhangsan
password: 333
virtual-host: /
host: 127.0.0.1
port: 5672
复制代码
@Configuration
public class RabbitMqConfiguration {
//创建交换机
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("myExchange", true, false);
}
//创建队列
@Bean
public Queue myQueue() {
return new Queue("myQueue", true);
}
//完成绑定关系
public Binding myBind() {
return BindingBuilder.bind(myQueue()).to(fanoutExchange());
}
}
复制代码
//生产者发送消息
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void product() {
String exchangeName = "myExchange";
String routingKey = "";
String message = "message";
rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
}
}
复制代码
//消费者接收消息
@Component
@RabbitListener(queues = {"myQueue"})
public class Consumer {
@RabbitHandler
public void consume(String message) {
System.out.println(message);
}
}
复制代码
过期时间TTL
设置队列TTL
@Configuration
public class TTLRabbitMQConfiguration {
@Bean
public DirectExchange ttlDirectExchange() {
return new DirectExchange("ttlDirectExchange", true, false);
}
@Bean
public Queue ttlDirectQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 5000);
return new Queue("ttlDirectionQueue", true, false, false, args);
}
@Bean
public Binding ttlBinding() {
return BindingBuilder.bin(ttlDirectQueue()).to(ttlDirectExchange()).with("routingKey");
}
}
复制代码
设置消息TTL
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void product() {
String exchangeName = "ttlDirectExchange";
String routingKey = "ttl";
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
public Message postProcessMessage(Message message) {
message.getMessageProperties().setExpiration("5000");
message.getMessageProperties().setContentEncoding("UTF-8");
return message;
}
};
rabbitTemplate.convertAndSend(exchangeName, routingKey, message, messagePostProcessor);
}
}
复制代码
死信队列
消息过期,消息被拒绝,队列达到最大长度
@Configuration
public class RabbitConfiguration {
@Bean
public DirectExchanges deadDirect() {
return new DirectExchage("deadExchange", true, false);
}
@Bean
public Queue deadQueue() {
return new Queue("deadQueue", true);
}
@Bean
public Binding deadBinding() {
return BindingBuilder.bind(deadDirect()).to(deadQueue()).with("dead");
}
@Bean
public Queue ttlDirectQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 5000);
args.put("x-dead-letter-exchange", "dead_direct_exchange");
args.put("x-dead-letter-routing-key", "dead");
return new Queue("ttlDirectionQueue", true, false, false, args).with("routingKey")
}
}
复制代码
集群
主备集群
#配置各个主机的host
echo "ip hostName" >> /etc/hosts
#同步各节点的 /var/lib/rabbitmq/.erlang.cookie
#启动各个节点
rabbitmq-server -detached
#操作节点加入集群
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@mainHostName
rabbitmqctl start_app
复制代码
高可用集群
#在节点上添加策略,所有队列都变成镜像队列
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
复制代码
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END