RabbitMQ
整合Spring AMQP
AMQP
核心组件:
RabbitAdmin
SpringAMQP
声明RabbitTemplate
SimpleMessageListenerContainer
MessageListenerAdapter
MessageConverter
RabbitAdmin
autoStartup
设置为 true,否则Spring
容器不会加载RabbitAdmin
类。
RabbitAdmin
底层实现是从Spring
容器中获取Exhcange
、Binding
、RoutingKey
和Queue
的@Bean
声明。
然后调用RabbitTemplate
的execute
方法执行对应的声明、改、删除等基础功能操作。
在Bean
加载后进行初始化设置
autoStartup
需要为true
代码演示:
@Configuration
@ComponentScan({"com.orcas.spring"})
public class RabbitMQConfig {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses("192.168.58.129:5672");
connectionFactory.setUsername("orcas");
connectionFactory.setPassword("1224");
connectionFactory.setVirtualHost("/");
return connectionFactory;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}
复制代码
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {
@Autowired
private RabbitAdmin rabbitAdmin;
@Test
public void testAdmin() {
// 1
rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));
rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));
rabbitAdmin.declareBinding(new Binding("test.direct.queue", Binding.DestinationType.QUEUE,
"test.direct", "direct", new HashMap<>()));
// 2
rabbitAdmin.declareBinding(
BindingBuilder.bind(new Queue("test.topic.queue", false))
.to(new TopicExchange("test.topic", false, false))
.with("topic.#"));
// 清空队列
rabbitAdmin.purgeQueue("test.topic.queue", false);
}
}
复制代码
SpringAMQP 声明
@Bean
方式声明
@Bean
public TopicExchange exchange01() {
return new TopicExchange("topic_exchange01", true, false);
}
@Bean
public Queue queue01() {
return new Queue("queue01", true);
}
@Bean
public Binding binding01() {
return BindingBuilder.bind(queue01()).to(exchange01()).with("topic.*");
}
复制代码
RabbitTemplate
消息模板,提供了发送消息的方法,包括可靠性投递消息方法、回调监听消息接口 ConfirmCallback
、返回值确认接口 ReturnCallback
等。需注入到 Spring
容器中。
与 Spring
整合时需要实例化,与 SpringBoot
整合时,需在配置文件中添加配置。
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage() {
// 创建消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("UTF-8");
messageProperties.getHeaders().put("desc", "描述");
Message message = new Message("Hi".getBytes(), messageProperties);
// 发送消息
rabbitTemplate.convertAndSend("topic_exchange01", "topic.amqp", message,
new MessagePostProcessor() {
// 额外的消息设置
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().getHeaders().put("desc", "修改描述");
return message;
}
});
复制代码
SimpleMessageListenerContainer
简单消息监听容器:
- 监听(多个)队列、自动启动、自动声明
- 设置事务特性、事务管理器、事务属性、事务容量(并发)、是否开始事务、回滚消息等。
- 设置消费者数量、最小最大数量、批量消费
- 设置消息签收模式、是否重回队列、异常捕获handler函数
- 设置消费者标签生成策略、是否独占模式、消费者属性等
- 设置具体的监听器、消息转换器等。
- 可动态设置配置项
@Bean
public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue01(), queue02());
container.setConcurrentConsumers(1); // 当前消费者数量
container.setMaxConcurrentConsumers(5);
container.setDefaultRequeueRejected(false); // 重回队列
container.setAcknowledgeMode(AcknowledgeMode.AUTO); // 签收模式
container.setConsumerTagStrategy(new ConsumerTagStrategy() { // 消费端标签策略
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody());
System.out.println("====== 消费者 ======" + msg);
}
});
return container;
}
复制代码
MessageListenerAdapter
消息监听适配器
defaultListenerMethod
默认监听方法名称:用于设置监听方法名称Delegate
委托对象:实际真实的委托对象,用于处理消息queueOrTagToMethodName
队列标识与方法名称组成的集合,可以将队列和方法名绑定,指定队列中的消息会被所绑定的方法处理
- 可以使用自定义的消息监听器,不再是上文中
new ChannelAwareMessageListener()
并重写其onMessage()
方法。
...
...
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
container.setMessageListener(adapter);
...
...
复制代码
默认方法名是hanleMessage
public class MessageDelegate {
public void handleMessage(byte[] messageBody) {
System.out.println("== 消息内容 ==" + new String(messageBody));
}
}
复制代码
- 可通过
adapter.setDefaultListenerMethod("consumeMessage");
设置方法名
public class MessageDelegate {
public void consumeMessage(String messageBody) {
System.out.println("== 消息内容 ==" + new String(messageBody));
}
}
复制代码
- 而如果想要改变方法的入参,可以设置一个自定义的消息转换器,通过自定义一个
Converter
实现MessageConverter
接口。(1.6 再补充)
// 该转换器将字节数组转换成String
adapter.setMessageConverter(new TextMessageConverter());
public class TextMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
return new Message(object.toString().getBytes(), messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
String contentType = message.getMessageProperties().getContentType();
// 定义的转换规则
if (null != contentType && contentType.contains("text")) {
return new String(message.getBody());
}
return message.getBody();
}
}
复制代码
- 将队列名称和路由的方法进行绑定
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
Map<String, String> map = new HashMap<>();
map.put("queue01", "method01");
map.put("queue02", "method02");
// 将队列名称和路由的方法进行绑定
adapter.setQueueOrTagToMethodName(map);
container.setMessageListener(adapter);
复制代码
MessageConverter
消息转换器
正常情况下,消息体为二进制的数据方式传输,可以通过实现MessageConverter
接口自定义转换器。
需要重写两个方法:
toMessage
:java
对象转换为Message
fromMessage
Message
对象转换为java
对象
几种类型的转换器:
Json
转换器:Jackson2JsonMessageConverter
进行Java
对象的转换功能DefaultJackson2JavaTypeMapper
映射器:Java
对象的映射关系- 自定义二进制转换器:如图片类型、
PDF
、PPT
、流媒体
Json
转换器:Jackson2JsonMessageConverter
(入参Map)
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
container.setMessageListener(jackson2JsonMessageConverter);
复制代码
- 支持
Java
对象的转换:DefaultJackson2JavaTypeMapper&Jackson2JsonMessageConverter
(入参Java对象)
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper defaultJackson2JavaTypeMapper = new DefaultJackson2JavaTypeMapper();
jackson2JsonMessageConverter.setJavaTypeMapper(defaultJackson2JavaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
复制代码
- 支持
java
对象多映射转换:DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>();
// (标签, 类的全路径)将标签和类进行绑定
idClassMapping.put("order", com.orcas.spring.entity.Order.class);
idClassMapping.put("packaged", com.orcas.spring.entity.Packaged.class);
javaTypeMapper.setIdClassMapping(idClassMapping);
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
复制代码
- 全局的转换器:
ContentTypeDelegatingMessageConverter
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
//全局的转换器:
ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter();
TextMessageConverter textConvert = new TextMessageConverter();
convert.addDelegate("text", textConvert);
convert.addDelegate("html/text", textConvert);
convert.addDelegate("xml/text", textConvert);
convert.addDelegate("text/plain", textConvert);
Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter();
convert.addDelegate("json", jsonConvert);
convert.addDelegate("application/json", jsonConvert);
ImageMessageConverter imageConverter = new ImageMessageConverter();
convert.addDelegate("image/png", imageConverter);
convert.addDelegate("image", imageConverter);
PDFMessageConverter pdfConverter = new PDFMessageConverter();
convert.addDelegate("application/pdf", pdfConverter);
adapter.setMessageConverter(convert);
container.setMessageListener(adapter);
复制代码
- 图片转换器:
public class ImageMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
throw new MessageConversionException(" convert error ! ");
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
System.err.println("-----------Image MessageConverter----------");
// 获取消息扩展属性中的"extName"
Object _extName = message.getMessageProperties().getHeaders().get("extName");
// 若为空默认为png, 否则就获取该扩展名
String extName = _extName == null ? "png" : _extName.toString();
byte[] body = message.getBody();
String fileName = UUID.randomUUID().toString();
// 该图的路径+图片名
String path = "d:/010_test/" + fileName + "." + extName;
File f = new File(path);
try {
Files.copy(new ByteArrayInputStream(body), f.toPath());
} catch (IOException e) {
e.printStackTrace();
}
return f;
}
}
复制代码
PDF
转换器:
public class PDFMessageConverter implements MessageConverter {
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
throw new MessageConversionException(" convert error ! ");
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
System.err.println("-----------PDF MessageConverter----------");
byte[] body = message.getBody();
String fileName = UUID.randomUUID().toString();
String path = "d:/010_test/" + fileName + ".pdf";
File f = new File(path);
try {
Files.copy(new ByteArrayInputStream(body), f.toPath());
} catch (IOException e) {
e.printStackTrace();
}
return f;
}
}
复制代码
测试代码
@Test
public void testSendExtConverterMessage() throws Exception {
// byte[] body = Files.readAllBytes(Paths.get("d:/002_books", "picture.png"));
// MessageProperties messageProperties = new MessageProperties();
// messageProperties.setContentType("image/png");
// messageProperties.getHeaders().put("extName", "png");
// Message message = new Message(body, messageProperties);
// rabbitTemplate.send("", "image_queue", message);
byte[] body = Files.readAllBytes(Paths.get("d:/002_books", "mysql.pdf"));
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("application/pdf");
Message message = new Message(body, messageProperties);
rabbitTemplate.send("", "pdf_queue", message);
}
复制代码
RabbitMQ
整合SpringBoot
基本配置
依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
复制代码
消息的生产端
配置文件:
spring:
rabbitmq:
host: 192.168.58.129
port: 5672
username: orcas
password: 1224
virtual-host: /
connection-timeout: 15000
publisher-confirms: true
publisher-returns: true
template:
mandatory: true
复制代码
publisher-confirms
,实现一个监听器用于监听Broker
端返回的确认请求。publisher-returns
,保证消息对Broker
端是可达的,如果出现路由键不可达情况,则使用监听器对不可达消息进行后续处理,保证消息的路由成功。template.mandatory=true
,true,则监听器会接收到路由不可达的消息,然后进行处理;false,Broker
会自动删除该消息。默认是false。
RabbitMQConfig
配置文件(可选)
可以参考与Spring
整合中的配置文件,就是以@Bean
的方式声明交换机、队列与绑定关系。
发送消息
当消息路由失败,会调用 ReturnCallback
@Component
public class RabbitSender {
//自动注入RabbitTemplate模板类
@Autowired
private RabbitTemplate rabbitTemplate;
//回调函数: confirm确认
final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.err.println("correlationData: " + correlationData);
System.err.println("ack: " + ack);
if(!ack){
System.err.println("异常处理....");
}
}
};
//回调函数: return返回
final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText,
String exchange, String routingKey) {
System.err.println("return exchange: " + exchange + ", routingKey: "
+ routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
}
};
//发送消息方法调用: 构建Message消息
public void send(Object message, Map<String, Object> properties) throws Exception {
MessageHeaders mhs = new MessageHeaders(properties);
Message msg = MessageBuilder.createMessage(message, mhs);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
// 设置消息的唯一id
CorrelationData correlationData = new CorrelationData("1234567890"); //id + 时间戳 全局唯一
rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);
}
//发送消息方法调用: 构建自定义对象消息
public void sendOrder(Order order) throws Exception {
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
//id + 时间戳 全局唯一
CorrelationData correlationData = new CorrelationData("0987654321");
rabbitTemplate.convertAndSend("exchange-2", "springboot.def", order, correlationData);
}
}
复制代码
消息的消费端
配置文件:
spring:
rabbitmq:
host: 192.168.58.129
port: 5672
username: orcas
password: 1224
virtual-host: /
connection-timeout: 15000
publisher-confirms: true
publisher-returns: true
template:
mandatory: true
listener:
simple:
acknowledge-mode: manual # 手动ack
concurrency: 5 # 监听消息的个数
max-concurrency: 10
# 自定义mq配置 用于声明交换机、队列、绑定路由的参数
order:
queue:
name: queue-2
durable: true
exchange:
name: exchange-2
durable: true
type: topic
ignoreDeclarationExceptions: true
key: springboot.*
复制代码
@RabbitListener
消费端监听
@Exchange、@Queue、@QueueBinding
组合注解用来声明交换机、队列和绑定路由。
@Component
public class RabbitReceiver {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}",
durable="${spring.rabbitmq.listener.order.queue.durable}"),
exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}",
durable="${spring.rabbitmq.listener.order.exchange.durable}",
type= "${spring.rabbitmq.listener.order.exchange.type}",
ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
key = "${spring.rabbitmq.listener.order.routingKey}"
)
)
@RabbitHandler
public void onOrderMessage(@Payload Order order, Channel channel, @Headers Map<String, Object> headers) throws Exception {
System.err.println("--------------------------------------");
System.err.println("消费端order: " + order.getId());
Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
//手工ACK
channel.basicAck(deliveryTag, false);
}
}
复制代码
RabbitMQ
整合 Spring Cloud Stream
SpringCloudStream
不能保证消息的100%可靠性,用于和Kafka
兼顾,目的是高性能的消息通信。
Spring Cloud Stream
整体架构核心概念图
多了Input
和Output
的通道
Barista
接口
Barista
接口:定义作为后面类的参数,定义通道类型(决定通道是用于发送还是接收消息)和通道名称(作为配置用)。
@Output
:输出注解,用于定义发送消息接口。
@Input
:输入接口,用于定义消息的消费者接口。
@StreamListener
:用于定义监听方法的注解。
整合
依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
复制代码
生产端
Barista
接口:定义输出通道,添加绑定关系
public interface Barista {
String OUTPUT_CHANNEL = "output_channel";
// 注解@Output声明了它是一个输出类型的通道,名字是output_channel。
// 这一名字与app1中通道名一致,表明注入了一个名字为output_channel的通道
// 类型是output,发布的主题名为mydest。
@Output(Barista.OUTPUT_CHANNEL)
MessageChannel logoutput();
}
复制代码
配置
spring:
cloud:
stream:
bindings:
output_channel:
destination: exchange-3
group: queue-3
binder: rabbit_cluster
binders:
rabbit_cluster:
type: rabbit
enviroment:
spring:
rabbitmq:
address: 192.168.58.129:5672
username: orcas
password: 1224
virtual-host: /
复制代码
@EnableBinding(Barista.class)
@Service
public class RabbitmqSender {
@Autowired
private Barista barista;
// 发送消息
public String sendMessage(Object message, Map<String, Object> properties) throws Exception {
try{
MessageHeaders mhs = new MessageHeaders(properties);
Message msg = MessageBuilder.createMessage(message, mhs);
boolean sendStatus = barista.logoutput().send(msg);
System.err.println("--------------sending -------------------");
System.out.println("发送数据:" + message + ",sendStatus: " + sendStatus);
}catch (Exception e){
System.err.println("-------------error-------------");
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
return null;
}
}
复制代码
消费端
Barista
public interface Barista {
String INPUT_CHANNEL = "input_channel";
// 注解@Input声明了它是一个输入类型的通道,名字是Barista.INPUT_CHANNEL,也就是position3的input_channel。
// 这一名字与上述配置app2的配置文件中position1应该一致,表明注入了一个名字叫做input_channel的通道,
// 它的类型是input,订阅的主题是position2处声明的mydest这个主题
@Input(Barista.INPUT_CHANNEL)
SubscribableChannel loginput();
}
复制代码
配置
spring:
cloud:
stream:
bindings:
input_channel:
destination: exchange-3
group: queue-3
binder: rabbit_cluster
consumer:
concurrency: 1
rabbit:
bindings:
input_channle:
consumer:
requeue-rejected: false
acknowledge-mode: MANUAL
recovery-interval: 3000
durable-subscription: true
max-concurrency: 5
binders:
rabbit_cluster:
type: rabbit
enviroment:
spring:
rabbitmq:
address: 192.168.58.129:5672
username: orcas
password: 1224
virtual-host: /
复制代码
接收
@EnableBinding(Barista.class)
@Service
public class RabbitmqReceiver {
@StreamListener(Barista.INPUT_CHANNEL)
public void receiver(Message message) throws Exception {
Channel channel = (com.rabbitmq.client.Channel) message.getHeaders().get(AmqpHeaders.CHANNEL);
Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
System.out.println("Input Stream 1 接受数据:" + message);
System.out.println("消费完毕------------");
channel.basicAck(deliveryTag, false);
}
}
复制代码