“Hello World!” RabbitMQ七种工作模式之一

官网地址:www.rabbitmq.com/tutorials/t…

helloworld.png

1. 引入依赖

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.3.0</version>
    </dependency>
复制代码

2. 生产消息

package com.baiqi.rabbitmq.helloworld;

import com.baiqi.rabbitmq.utils.RabbitConstant;
import com.baiqi.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author 白起老师
 */
public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {

        //获取TCP长连接
        Connection conn = RabbitUtils.getConnection();
        //创建通信“通道”,相当于TCP中的虚拟连接
        Channel channel = conn.createChannel();

        //创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
        //第一个参数:队列名称ID,自定义的字符串
        //第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
        //第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问
        //第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列
        //其他额外的参数, null
        channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD,false, false, false, null);

        String message = "hello zheng";
        //四个参数
        //exchange 交换机,简单模式用不到,在后面进行发布订阅时才会用到
        //队列名称
        //额外的设置属性
        //最后一个参数是要传递的消息字节数组
        channel.basicPublish("", RabbitConstant.QUEUE_HELLOWORLD, null,message.getBytes());
        channel.close();
        conn.close();
        System.out.println("===发送成功===");

    }
}
复制代码

3. 消费消息

获取消息

package com.baiqi.rabbitmq.helloworld;

import com.baiqi.rabbitmq.utils.RabbitConstant;
import com.baiqi.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author 白起老师
 */
public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {


        //获取TCP长连接
        Connection conn = RabbitUtils.getConnection();
        //创建通信“通道”,相当于TCP中的虚拟连接
        Channel channel = conn.createChannel();
        //创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
        channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD, false, false, false, null);

        //从MQ服务器中获取数据

        //创建一个消息消费者
        //第一个参数:队列名
        //第二个参数代表是否自动确认收到消息,false代表手动编程来确认消息,这是MQ的推荐做法
        //第三个参数要传入DefaultConsumer的实现类
        channel.basicConsume(RabbitConstant.QUEUE_HELLOWORLD, false, new Reciver(channel));
        channel.close();
        conn.close();

    }
}
复制代码

处理消息

package com.baiqi.rabbitmq.helloworld;

import com.baiqi.rabbitmq.utils.RabbitConstant;
import com.baiqi.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

class Reciver extends DefaultConsumer {

    private Channel channel;

    //重写构造函数,Channel通道对象需要从外层传入,在handleDelivery中要用到
    public Reciver(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

        String message = new String(body);
        System.out.println("消费者接收到的消息:" + message);

        System.out.println("消息的TagId:" + envelope.getDeliveryTag());
        //false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
        channel.basicAck(envelope.getDeliveryTag(), false);
    }
}
复制代码

涉及工具类等其它

package com.baiqi.rabbitmq.utils;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitUtils {
    private static ConnectionFactory connectionFactory = new ConnectionFactory();

    static {
        connectionFactory.setHost("192.168.168.10");
        connectionFactory.setPort(5672);//5672是RabbitMQ的默认端口号
        connectionFactory.setUsername("zheng123");
        connectionFactory.setPassword("zheng123");
        connectionFactory.setVirtualHost("/zheng");
    }

    public static Connection getConnection() {
        Connection conn = null;
        try {
            conn = connectionFactory.newConnection();
            return conn;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}
复制代码
package com.baiqi.rabbitmq.utils;

public class RabbitConstant {
    public static final String QUEUE_HELLOWORLD = "helloworld";
}
复制代码

说明:本篇文章内容非完全原创,多来自图灵学院,经本人测试成功然后搬运分享,欢迎交流。

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享