RabbitMQ

消息队列

  • 解耦
  • 异步
  • 削峰

Docker部署RabbitMQ

#下载rabbitmq镜像
docker pull rabbitmq:3.7.7-management

#创建rabbitmq容器,可视化界面为15672端口,默认用户名密码为guest
docker run -d --name myRabbitMQ -p 5672:5672 -p 15672:15672 rabbitmq:3.7.7-management
复制代码

RabbitMQ组成

  • 生产者

  • 消费者

  • 交换机

  • 虚拟主机

  • 队列

搭建简单模式

//开启关闭连接工具类
public class ConnectionUtil {
    public static Connection openConnection() {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("zhangsan");
        factory.setPassword("333");
        factory.setVirtualHost("/");
        Connection connection = factory.newConnection();
        return connection;
    }
    public static void closeConnection(Connection connection, Channel channel) {
        connection.close();
        channel.close();
    }
}

//新建队列和交换机
{
    Connection connection = ConnectionUtil.openConnection();
    Channel channel = connection.createChannel();
    String queueName = "myQueue";
    String exchangeType = "direct";
    //创建队列,队列名称,是否持久化,是否为该channel独有,是否自动删除队列,携带附属参数
    channel.queueDeclare(queueName, false, false, false, null);
    //创建交换机,交换机是否持久化
    channel.exchangeDeclare(exchangeName,  exchangeType, true);
    //绑定队列和交换机
    channel.queueBind(queueName, exchangeName, routingKey);
    //关闭连接
    ConnectionUtil.closeConnection(connection, channel);
}

//生产者搭建
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.openConnection();
        Channel channel = connection.createChannel();
        String exchangeName = "myExchange";
        String routingKey = "myRoutingKey";
        String msg = "Hello World";
        //生产
        channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
        //关闭连接
        ConnectionUtil.closeConnection(connection, channel);
    }
}

//消费者搭建
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
		Connection connection = ConnectionUtil.openConnection();
        Channel channel = connection.createChannel();
        String queueName = "myQueue";
        //消费
        channel.basicConsume(queueName, true, new DeliverCallback() {
            public void handle(String s, Delivery delivery) throws IOException {
                System.out.println("收到消息" + new String(delivery.getBody(), "UTF-8"));
            }
        }, new CancelCallback() {
            public void handle(String s) throws IOException {
                System.out.println("接收消息失败");
            }
        });
        System.out.println("接收中。。。");
        System.in.read();
        //关闭连接
        ConnectionUtil.closeConnection(connection, channel);
    }
}
复制代码

消费模式

平均方式

//将autoAck设置为true
channel.basicConsumer("queue", true, deliverCallback);
复制代码

公平方式

//设置一次只能消费一个消息
channel.basicQos(1);

//将autoAck设置为false
channel.basicConsumer("queue", false, deliverCallback);

//消息消费完在回调函数中确认消费
channel.basicAck(delivery.getEnvelop().getDeliveryTag(), false);
复制代码

生产模式

Fanout

//交换机将消息发送到与之绑定的所有队列
channel.basicPublish(exchangeName, "", null, msg.getBytes());
复制代码

Direct

//交换机将消息发送到与routingkey绑定的队列
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
复制代码

Topic

//交换机绑定时设置模糊routingkey,#表示n个词,*表示1个词
channel.queueBind(queueName, exchangeName, routingKey);

//交换机将消息发送到模糊匹配的队列中
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
复制代码

整合SpringBoot

#配置application.yaml
server:
    port:
        8080
spring:
    rabbitmq:
        username: zhangsan
        password: 333
        virtual-host: /
        host: 127.0.0.1
        port: 5672
复制代码
@Configuration
public class RabbitMqConfiguration {
    //创建交换机
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("myExchange", true, false);
    }
    //创建队列
    @Bean
    public Queue myQueue() {
        return new Queue("myQueue", true);
    }
    //完成绑定关系
    public Binding myBind() {
        return BindingBuilder.bind(myQueue()).to(fanoutExchange());
    }
}
复制代码
//生产者发送消息
public class Producer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void product() {
        String exchangeName = "myExchange";
        String routingKey = "";
        String message = "message";
        rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
    }
}
复制代码
//消费者接收消息
@Component
@RabbitListener(queues = {"myQueue"})
public class Consumer {
    @RabbitHandler
    public void consume(String message) {
        System.out.println(message);
    }
}
复制代码

过期时间TTL

设置队列TTL

@Configuration
public class TTLRabbitMQConfiguration {
    @Bean
    public DirectExchange ttlDirectExchange() {
        return new DirectExchange("ttlDirectExchange", true, false);
    }
    @Bean
    public Queue ttlDirectQueue() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-message-ttl", 5000);
        return new Queue("ttlDirectionQueue", true, false, false, args);
    }
    @Bean
    public Binding ttlBinding() {
        return BindingBuilder.bin(ttlDirectQueue()).to(ttlDirectExchange()).with("routingKey");
    }
}
复制代码

设置消息TTL

public class Producer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void product() {
        String exchangeName = "ttlDirectExchange";
        String routingKey = "ttl";
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            public Message postProcessMessage(Message message) {
                message.getMessageProperties().setExpiration("5000");
                message.getMessageProperties().setContentEncoding("UTF-8");
                return message;
            }
        };
        rabbitTemplate.convertAndSend(exchangeName, routingKey, message, messagePostProcessor);
    }
}
复制代码

死信队列

消息过期,消息被拒绝,队列达到最大长度

@Configuration
public class RabbitConfiguration {
    @Bean
    public DirectExchanges deadDirect() {
        return new DirectExchage("deadExchange", true, false);
    }
    @Bean
    public Queue deadQueue() {
        return new Queue("deadQueue", true);
    }
    @Bean
    public Binding deadBinding() {
        return BindingBuilder.bind(deadDirect()).to(deadQueue()).with("dead");
    }
    @Bean
    public Queue ttlDirectQueue() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-message-ttl", 5000);
        args.put("x-dead-letter-exchange", "dead_direct_exchange");
        args.put("x-dead-letter-routing-key", "dead");
        return new Queue("ttlDirectionQueue", true, false, false, args).with("routingKey")
    }
}
复制代码

集群

主备集群

#配置各个主机的host
echo "ip hostName" >> /etc/hosts

#同步各节点的 /var/lib/rabbitmq/.erlang.cookie

#启动各个节点
rabbitmq-server -detached

#操作节点加入集群
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@mainHostName
rabbitmqctl start_app
复制代码

高可用集群

#在节点上添加策略,所有队列都变成镜像队列
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
复制代码
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享