Work queues —— RabbitMQ七种工作模式之二

官网地址:www.rabbitmq.com/tutorials/t…

work queues.png

前要

Work queues 与 “Hello World!” 工作模式相比,消费者增加,多个消费端共同消费同一个队列中的消息。对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

以下示例代码中不再有冗长的注释说明,因为相同代码的说明已经在 《“Hello World!” RabbitMQ七种工作模式之一》中注释。相关依赖和工具类RabbitUtils.java RabbitConstant.java也可以在 《“Hello World!” RabbitMQ七种工作模式之一》找到。

生产消息

package com.baiqi.rabbitmq.workqueue;

import com.baiqi.rabbitmq.utils.RabbitConstant;
import com.baiqi.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class OrderSystem {

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);

        for(int i = 1 ; i <= 100 ; i++) {
            String sms = "乘客" + i + "您的车票已预订成功";
            channel.basicPublish("" , RabbitConstant.QUEUE_SMS , null , sms.getBytes());
        }
        System.out.println("发送数据成功");
        channel.close();
        connection.close();
    }
}
复制代码

消费者

package com.baiqi.rabbitmq.workqueue;

import com.baiqi.rabbitmq.utils.RabbitConstant;
import com.baiqi.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class SMSSender1 {

    public static void main(String[] args) throws IOException {

        Connection connection = RabbitUtils.getConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);

        //如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
        //basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
        channel.basicQos(1);//处理完一个取一个

        channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String sms = new String(body);
                System.out.println("SMSSender1-短信发送成功:" + sms);
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag() , false);
            }
        });
    }
}
复制代码

消费者这段代码可以复制三份启动三个进程,即三个消费者。

对比总结

代码角度来说,工作队列模式(Work queues)和简单模式(Hello World!)模式并无区别,多的 channel.basicQos(1); 也只是定义了消费者获取消息的方式,其它都没改变。所以工作队列模式的本质是消费者的集群化,以快速消费消息。

说明:以上示例代码来自图灵学院,经本人测试整理,然后搬运分享。

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享