生产级rocketMQ延时消息+redis去重+高效率序列化|Java 开发实战

1.话不多说,先提问题(某互联网公司实际需求~~~~)

#####一生成订单后如果一个小时没有打款,就自动撤单,并做出惩罚措施。

  • 本文所涉及技术RocketMQ版本:4.3.1 ,JDK1.8,protostuff版本1.1.3

2.pom文件(部分)

        <dependency>
			<groupId>org.apache.rocketmq</groupId>
			<artifactId>rocketmq-client</artifactId>
			<version>4.3.1</version>
		</dependency>
        <dependency>
			<groupId>com.dyuproject.protostuff</groupId>
			<artifactId>protostuff-core</artifactId>
			<version>1.1.3</version>
		</dependency>
		<dependency>
			<groupId>com.dyuproject.protostuff</groupId>
			<artifactId>protostuff-runtime</artifactId>
			<version>1.1.3</version>
		</dependency>
复制代码

3.直接上代码!生产者

    @Autowired
    private NoticeService noticeService; // 封装的一个mq service类
    private RuntimeSchema<String> timeSchema = RuntimeSchema.createFrom(String.class);//序列化需要使用
    public void test() {
      String messages = buildMQMessage(merchantOrder.getOrderNo(), p.getInvoke(), p.getMethod(), Datas.BORROW);
      this.noticeService.delayNotice(messages, this.timeSchema, "OtcTimer", "timer", p.getTimeLevel());
    }
///
  //构建JSON消息体,有orderNo,需要定时结束执行的反射方法,当前方法(方便日志),类型(区分业务)
   protected String buildMQMessage(String orderNo, String invoke, String method, String type) {
		JSONObject jsonObject = new JSONObject();
		jsonObject.put("orderNo", orderNo);
		jsonObject.put("invoke", invoke);
		jsonObject.put("method", method);
		jsonObject.put("type", type);
		return jsonObject.toJSONString();
	}
//重点讲讲这个方法
/**
*messages:消息体
schema:加一个缓冲区,加快序列化速度
topic:根据topic找消费者
tags:标签
timeLevel:延迟等级messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h (取小标为获取对应的延迟时间,也可自定义)
*/
    public <T> void delayNotice(T messages, RuntimeSchema<T> schema, String topic, String tags, Integer timeLevel) {
		String key = OtcUtil.createUUId();
		try {
			byte[] bytes = ProtostuffIOUtil.toByteArray(messages, schema,
					LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE));//加一个缓冲区,加快序列化速度
			Message message = new Message(topic, tags, key, bytes);//组建消息体
			message.setDelayTimeLevel(timeLevel);//设置等级(下标)
			SendResult sendResult = this.defaultMQProducer.send(message);//(发送)
			if (sendResult.getSendStatus() == SendStatus.SEND_OK) {

			}
		} catch (Exception e) {
			// TODO: handle exception
			logger.error("sendMq onException , key : " + key, e);
		}
	  }
       
                 

复制代码

4.直接上代码!消费者

@Component
public class OtcTimerConsumer {

	private final static Logger logger = LoggerFactory.getLogger(OtcTimerConsumer.class);

	private RuntimeSchema<String> schema = RuntimeSchema.createFrom(String.class);

	@Autowired
	private RedisService redisService;//redis去重,防止重复消费

	@Qualifier("borrowProcessTimerService")
	@Autowired
	private ProcessTimerService borrowProcessTimerService;

	@EventListener(condition = "#event.topic == 'OtcTimer'")
	public void rocketmqMsgListen(DefaultMQCustomerEvent event) throws Exception {
		try {
			// 判断key是否存在,去重
			String key = event.getMsg().getKeys();
			Set<Object> set = this.redisService.getRepeat(key);
			if (set.size() > 0) {
				return;
			}
			// 参数解析,反序列化解析参数
			String paramter = schema.newMessage();
			ProtostuffIOUtil.mergeFrom(event.getMsg().getBody(), paramter, schema);
			if (StringUtils.isEmpty(paramter)) {
				throw new BusinessException(Codes.CODE_500, Messages.OTC_MQ_MESSAGE_ISNULL);
			}
			JSONObject jsonObject = JSONObject.parseObject(paramter);
			String orderNo = jsonObject.getString("orderNo");
			String method = jsonObject.getString("method");
			String invoke = jsonObject.getString("invoke");//需要反射的方法
			String type = jsonObject.getString("type");
			// 重点讲解
            //getMethod(需要执行的反射方法,方法里的参数类型)
            //invoke(需要执行的反射类,方法里的参数)
		   if (type.equals(Datas.BORROW)) {
				this.borrowProcessTimerService.getClass().getMethod(invoke, String.class, String.class, String.class)
						.invoke(this.borrowProcessTimerService, orderNo, method, invoke);
			}
			// 将消费完的key放入缓存,去重
			this.redisService.setRepeat("Otc:Timer:" + orderNo + ":" + invoke, paramter,
					Double.valueOf(System.nanoTime()));
		} catch (Exception e) {
			logger.error(e.getMessage(), e);
			throw new BusinessException(Codes.CODE_500, e.getMessage());
		}
	}

}
复制代码

5.直接上代码!反射方法,处理撤单,惩罚相关逻辑

@Service
public class BorrowProcessTimerService extends BorrowSuperService implements ProcessTimerService {
	@Transactional(rollbackFor = Exception.class)
	public void orderTimer(String orderNo, String method, String invoke) {
            // 具体逻辑
            // 我这里用到了事务,分布式锁==保证安全
            // 处理异常日志
        }
}
复制代码

总结:
(1)无需再轮询全部订单,效率高
(2)一个订单,任务只执行一次
(3)时效性好

真心感谢帅逼靓女们能看到这里,如果这个文章写得还不错,觉得有点东西的话

求点赞? 求关注❤️ 求分享? 对8块腹肌的我来说真的 非常有用!!!

如果本篇博客有任何错误,请批评指教,不胜感激 !❤️❤️❤️❤️

© 版权声明
THE END
喜欢就支持一下吧
点赞1 分享