设计一个简单的点赞功能

新增功能:点赞

现在几乎所有的媒体内容,无论是商品评价、话题讨论还是朋友圈都支持点赞,点赞功能成为了互联网项目的标配,那么我们也尝试在评价系统中加入点赞功能,实现为每一个评价点赞。

豆瓣短评中的点赞:

image-20210407234242890

要实现的点赞需求细节:

image-20210414215247131

从放弃出发

完整得实现点赞系统功能是很困难的。要支持亿级的用户数量,又要做到数据归档入库,要支持高峰期百万的秒并发写入,又要实现多客户端实时同步,要记录并维护用户的点赞关系,又要展示用户的点赞列表,这样全方位的需求会产生设计上的矛盾,就像CAP矛盾一样。

典型的比如并发量和同步性的矛盾。高并发的本质是速度,网络传输速度和程序运行速度决定了系统所能承载的容量,每个请求处理速度快才能在单位时间内处理更多的请求,只是一味得增大连接数而忽略请求响应时间,并发问题得不到根本性的解决。在我看来,应用程序内部运行速度的瓶颈在于三处,优先级由高到低是网络请求、对象创建、冗余计算,网络请求对响应速度具有决定性的影响力。但是,同步性又要求我们进行网络请求,比如同步数据到mysql或redis之中。鱼与熊掌不可兼得,并发量和同步性具有不可调和的矛盾。

还有存储容量与访问速度的矛盾。要记录用户的点赞列表,就意味着要长期维护用户的点赞关系,日积月累,用户的点赞关系在单台存储系统中装不下,需要写入分布式存储系统中,这带来了额外的复杂度与调度时延,并且需要很好地设计区分维度,不同分区之间数据不耦合。而一旦一次查询跨越了多个存储节点,就会产生级联调用,具有较大的网络时延。

要实现,先舍弃。看到一个新的需求时,我习惯于反向思考,观察这个需求不涉及到哪些功能,哪些功能可以放弃,从这个角度出发,很容易找到取巧而又简单,却能满足当前需求的设计方案。

重新列一个需求清单,上面写了不需要实现哪些功能,这样做设计决策时,就豁然开朗了。

image-20210414225218732

产品经理只会给你提供表格1,他们很少会显示说明什么不需要做。在决定放弃时,还是需要商量一下,因为这些需求往往是软性的,需求文档中没有包含不一定是不需要,也有可能是没考虑到。

如何记录用户的点赞关系

点赞关系是典型的K-V类型或是集合类型,用Redis实现是比较合适的,那么用Redis中的哪种数据类型呢?

下表列出了能想到的数据类型与它们各自的优劣。

image-20210414231656907

比较关键的特性是批量查询和内存占用,批量查询特性使得可以在一次请求中查询全部的点赞关系,内存占用使得可以用尽可能少的redis节点,甚至一台redis解决存储问题。

我选择字符串类型,因为哈希类型真的很难实现点赞数据的淘汰,除非记录点赞时间并且定期全局扫描,或者记录双份哈希键,做新旧替换,代价太高,不合适。而淘汰机制本身就是解决内存占用问题,所以字符串类型不会占用异常多的内存。

image-20210415101020806

点赞操作的原子性

点赞操作需要改写两个值,一个是用户对内容的点赞关系,另一个是内容的点赞总数,这两个能不能放在一个key中表示呢?显然是不行的。所以需要先设置用户的点赞关系,再增加点赞总数,如果点赞关系已经存在,就不能增加点赞总数。

设置点赞关系可以用setnx命令实现,仅当不存在key时才设置,并返回一个是否设置的标志,根据这个标志决定是否增加点赞总数。比如:

if setnx(key1) == 1
then 
	incr(key2)
复制代码

看似每个操作都是原子性的,但是这样的逻辑如果在客户端执行,整体上仍不满足原子性,仍有可能在两个操作之间发生中断,导致点赞成功但是没有增加计数的情况发生。虽然这对于点赞系统来说不是什么大问题,极少出现的概率可以接受,但是我们完全可以做的更好。

redis的事务或脚本特性可以解决上述的问题。脚本的实现更加灵活自由,而且能减少网络请求,我们选择脚本的方式:

--点赞操作,写入并自增,如果写入失败则不自增,【原子性、幂等性】
if redis.call('SETNX',KEYS[1],1) == 1
then
    redis.call('EXPIRE',KEYS[1],864000)
    redis.call('INCR',KEYS[2])
end
return redis.call('GET',KEYS[2])
复制代码
--取消点赞操作,删除并递减,如果删除失败则不递减,【原子性、幂等性】
if redis.call('DEL',KEYS[1]) == 1
then
    redis.call('DECR',KEYS[2])
end
return redis.call('GET',KEYS[2])
复制代码

稳定性的基本要求之一就是数据不能无限膨胀,否则迟早出问题,任何存储方案都必须设计与之对应的销毁方案,才能保证系统的稳定长久运行。所以设置KEY1的有效期非常重要,而KEY2可能需要一直保持,由其他机制来删除它,比如销毁陈旧评价或折叠评价时,需要删除对应的KEY2.

脚本返回了点赞后的总数,这对后续数据归档是有帮助的。

封装脚本操作

既然已经决定了redis存储方式,那么就先来实现它。一步一个脚印,扎扎实实地把点赞功能完成。

首先使用Spring配置Lua脚本,它自动预加载脚本,不用麻烦在redis服务器上用script load预编译。

/**
 * Lua脚本
 */
@Configuration
public class LuaConfiguration {
    /**
     * [点赞]脚本  lua_set_and_incr
     */
    @Bean
    public DefaultRedisScript<Integer> voteScript() {
        DefaultRedisScript<Integer> redisScript = new DefaultRedisScript<>();
        redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("script/lua_set_and_incr.lua")));
        redisScript.setResultType(Integer.class);
        return redisScript;
    }

    /**
     * [取消点赞]脚本  lua_del_and_decr
     */
    @Bean
    public DefaultRedisScript<Integer> noVoteScript() {
        DefaultRedisScript<Integer> redisScript = new DefaultRedisScript<>();
        redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("script/lua_del_and_decr.lua")));
        redisScript.setResultType(Integer.class);
        return redisScript;
    }
}
复制代码
/**
 * 点赞箱
 */
@Repository
public class VoteBox {
    private final RedisTemplate<String, Object> redisTemplate;
    private final DefaultRedisScript<Integer> voteScript;
    private final DefaultRedisScript<Integer> noVoteScript;

    public VoteBox(RedisTemplate<String, Object> redisTemplate, DefaultRedisScript<Integer> voteScript, DefaultRedisScript<Integer> noVoteScript) {
        this.redisTemplate = redisTemplate;
        this.voteScript = voteScript;
        this.noVoteScript = noVoteScript;
    }

    /**
     * 给评价投票(点赞),用户增加评价点赞记录,评价点赞次数+1.该操作是原子性、幂等性的。
     * @param voterId 投票人
     * @param contentId 投票目标内容id
     * @return 返回当前最新点赞数
     */
    public Integer vote(long voterId, long contentId){
        //使用lua脚本
        List<String> list = new ArrayList<>();
        list.add(MessageFormat.format(RedisKeyConstants.VOTE_USER_PATTERN, voterId, contentId));
        list.add(MessageFormat.format(RedisKeyConstants.VOTE_SUM_PATTERN, contentId));
        return redisTemplate.execute(voteScript, list);
    }

    /**
     * 取消给评价投票(点赞),用户删除评价点赞记录,评价点赞次数-1.该操作是原子性、幂等性的。
     * @param voterId 投票人
     * @param contentId 投票目标内容id
     * @return 返回当前最新点赞数
     */
    public Integer noVote(long voterId, long contentId){
        //使用lua脚本
        List<String> list = new ArrayList<>();
        list.add(MessageFormat.format(RedisKeyConstants.VOTE_USER_PATTERN, voterId, contentId));
        list.add(MessageFormat.format(RedisKeyConstants.VOTE_SUM_PATTERN, contentId));
        return redisTemplate.execute(noVoteScript, list);
    }
}
复制代码

点赞的流程

点赞的流程可以用如下时序图表示:

image-20210415151828448

  1. 服务端接收用户的点赞请求
  2. 执行redis脚本,并返回点赞总数信息,redis保存点赞功能的暂时数据
  3. 发送普通消息到消息队列
  4. 以上两步执行成功后响应点赞完成,否则加入重试队列
  5. 重试队列异步重试请求redis或消息队列,直到成功或重试次数用尽
  6. 消息队列消费者接收消息,并将消息写入mysql

为什么加入消息队列这个角色?因为消息队列使得同步和异步可以优雅的分离。redis命令需要在当前请求中完成,用户想看到请求的执行结果,希望在其他客户端上立刻看到自己的点赞状态,这个举例可能不太恰当,点赞也可能是单向请求,用户没有那么在乎同步性,这里只是为了演示案例。而数据入库或者是其他操作不需要在当前请求生命周期内完成。

如果同步可以称之为“在线服务”,那么异步可以称之为“半在线半离线服务”,虽然不在请求的生命周期内,但是运行于在线服务器之上,占用cpu和内存,占用网络带宽,势必给线上业务造成影响。当异步模式调整时,需要连同在线业务一起发布,造成逻辑上的耦合。而消息队列让“离线服务”成为可能,消费者可以与在线服务器独立开来,独立开发独立部署,无论是物理上还是逻辑上都完全解耦。当然前提是消息对象的序列化格式一致,所以我喜欢使用字符串作为消息对象的内容,而不是对象序列化。

实现mysql的点赞入库

设计好redis的存储方案后,接下来设计mysql的存储方案。

首先是表结构:

#点赞/投票归档表
CREATE TABLE IF NOT EXISTS vote_document
(
   id INT primary key auto_increment COMMENT 'ID',
   gmt_create datetime not null default CURRENT_TIMESTAMP COMMENT '创建时间',
   voter_id INT not null COMMENT '投票人id',
   contentr_id INT not null COMMENT '投票内容id',
   voting TINYINT not null COMMENT '投票状态(0:取消投票 1:投票)',
   votes INT not null COMMENT '投下/放弃这一票后,内容在此刻的投票总数',
   create_date INT not null COMMENT '创建日期 如:20210414 用于分区分表'
);

insert into vote_document(voter_id,content_id,voting,votes,create_date)
values(1,1,1,1,'20210414');
复制代码

显然,这是一个以Insert代替Update的日志表,无论是点赞、取消点赞还是重新点赞,都是追加新的记录,而不是修改原有记录。这样做有两个原因,一是Insert不用锁表,执行效率远高于Update,二是蕴含的信息更丰富,可以看到用户的完整行为,对于大数据分析是有帮助的。

Insert代替Update之后,一大难点就是数据聚合,解决方案就是每一次插入,都冗余地记录聚合状态,就像votes字段一样,分析时只需要拿相关评价的最后一条记录即可知道点赞总数,而不需全表扫描。

入库代码:

@Repository
public class VoteRepository {
    @Autowired
    private JdbcTemplate db;

    /**
     * 添加点赞
     * @param vote 点赞对象
     * @return 如果插入成功,返回true,否则返回false
     */
    public boolean addVote(/*valid*/ Vote vote) {
        String sql = "insert into vote_document(voter_id,content_id,voting,votes,create_date) values(?,?,?,?,?)";
        return db.update(sql, vote.getVoterId(), vote.getContentId(), vote.getVoting(), vote.getVotes(), Sunday.getDate()) > 0;
    }
}
复制代码

RocketMQ

Apache RocketMQ是一种低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列RocketMQ既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。

消息队列核心概念:

  • Topic:消息主题,一级消息类型,生产者向其发送消息。
  • Broker:中间人/经纪人,消息队列集群的节点,负责保存和收发消息。
  • 生产者:也称为消息发布者,负责生产并发送消息至Topic。
  • 消费者:也称为消息订阅者,负责从Topic接收并消费消息。
  • Tag:消息标签,二级消息类型,表示Topic主题下的具体消息分类。
  • 消息:生产者向Topic发送并最终传送给消费者的数据和(可选)属性的组合。
  • 消息属性:生产者可以为消息定义的属性,包含Message Key和Tag。
  • Group:一类生产者或消费者,这类生产者或消费者通常生产或消费同一类消息,且消息发布或订阅的逻辑一致。

生产者发送消息到消息队列,最终发送到消费者的示意图如下:

image-20210112223820896

消息类型可以划分为

  • 普通消息。也称并发消息,没有顺序,生产消费都是并行的,拥有极高的吞吐性能
  • 事务消息。提供了保证消息一定送达到broker的机制。
  • 分区顺序消息。Topic分为多个分区,在一个分区内遵循先入先出原则。
  • 全局顺序消息。把Topic分区数设置为1,所有消息都遵循先入先出原则。
  • 定时消息。将消息发送到MQ服务端,在消息发送时间(当前时间)之后的指定时间点进行投递
  • 延迟消息。将消息发送到MQ服务端,在消息发送时间(当前时间)之后的指定延迟时间点进行投递

消费方式可以划分为:

  • 集群消费。任意一条消息只需要被集群内的任意一个消费者处理即可。
  • 广播消费。将每条消息推送给集群内所有注册过的消费者,保证消息至少被每个消费者消费一次。

消费者获取消息模式可以划分为:

  • Push。开启单独的线程轮询broker获取消息,回调消费者的接收方法,仿佛是broker在推消息给消费者。
  • Pull。消费者主动从消息队列拉取消息。

使用RocketMQ

我们使用某云产品的RocketMq消息队列,按照官方文档,先在云控制中心创建Group和Topic,然后引入maven依赖,创建好MqConfig连接配置对象。最后:

配置生产者(在项目A):

@Configuration
public class ProducerConfig {
    @Autowired
    private MqConfig mqConfig;

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public Producer buildProducer() {
        return ONSFactory.createProducer(mqConfig.getMqPropertie());
    }
}
复制代码

配置消费者(在项目B):

@Configurationpublic class ConsumerClient {    @Autowired    private MqConfig mqConfig;    @Autowired    private VoteMessageReceiver receiver;    @Bean(initMethod = "start", destroyMethod = "shutdown")    public ConsumerBean buildConsumer() {        ConsumerBean consumerBean = new ConsumerBean();        Properties properties = mqConfig.getMqPropertie();        properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.GROUP_CONSUMER_ID);        properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "10");        consumerBean.setProperties(properties);        Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();        Subscription subscription = new Subscription();        subscription.setTopic(mqConfig.TOPIC_ISSUE);        subscription.setExpression(mqConfig.TAG_ISSUE);        subscriptionTable.put(subscription, receiver);        consumerBean.setSubscriptionTable(subscriptionTable);        return consumerBean;    }}
复制代码

创建消息接收、监听器:

/** * 投票消息接收器 */@Componentpublic class VoteMessageReceiver implements MessageListener {    private final VoteRepository voteRepository;    public VoteMessageReceiver(VoteRepository voteRepository) {        this.voteRepository = voteRepository;    }    @Override    public Action consume(Message message, ConsumeContext context) {        try {            JSONObject object = JSONObject.parseObject(new String(message.getBody()));            Vote vote = new Vote();            vote.setVoterId(object.getLongValue("voterId"));            vote.setContentId(object.getLongValue("contentId"));            vote.setVoting(object.getIntValue("voting"));            vote.setVotes(object.getLongValue("votes"));            try {                vote.validate();                voteRepository.addVote(vote);            } catch (IllegalArgumentException ignored) {            }            return Action.CommitMessage;        }catch (Exception e) {            e.printStackTrace();            return Action.ReconsumeLater;        }    }}
复制代码

发送消息的生产者,再稍稍封装一下:

/** * 消息生产者,消息投递仓库 */@Repositorypublic class MessagePoster {    private final Producer producer;    public MessagePoster(Producer producer) {        this.producer = producer;    }    public void sendMessage(String topic, String tag, String content){        Message message = new Message();        message.setTopic(topic);        message.setTag(tag);        message.setBody(content.getBytes(StandardCharsets.UTF_8));        producer.send(message);    }    public void sendMessage(String topic, String content){        sendMessage(topic, "", content);    }}
复制代码

发布消费者,在云控制中心测试(确保流程走通,步步为营):

image-20210415171026192

能达成一致吗

执行redis命令与发送消息这两步,能做到一致性吗,也就是常说的同时完成与同时失败?如果是同构的系统,可以利用系统本身的特性实现事务,比如同是redis操作可以使用redis事务或脚本,前面已经这么做了,如果同是数据库操作,可以使用数据库事务,其他存储系统应该也有类似的支持。

但它们是异构的系统,只能通过在客户端实现事务逻辑或者由第三方协调。常见的客户端实现方法是回滚:

try{	redis.call();     mq.call();}catch(MqException e){	//只有mq出错时才需要回滚    //使用反向操作回滚    redis.rollback();}   
复制代码

但是如果回滚失败呢?如果消息发到MQ但却接收失败呢?如果依赖的服务不支持回滚呢?在苛刻的条件下实现苛刻的一致性是不可能的。

还是应该反向思考,有选择性地舍弃某些不重要的部分,才能实现我们的需求。在目前这个需求中,没有必要为了redis和MQ的同步引入第三方的事务协调,但也不能对明显的事务问题视而不见。

我总结的分布式事务解决思路导图:

image-20210415202027996

我们选择使用重试队列来解决这个问题。

设计重试队列

不局限于当前的分布式事务问题,我们设计一个较为通用的重试队列。

先设计重试队列中的基本概念:任务。一个任务由多个单元组成,可计算单元表示有返回值的方法对象,执行单元表示没有返回值的方法对象,但是会接收上一步可计算单元的返回值作为入参。任务中保持了单元的单向链表,只有当一个单元执行成功后,才会指向下一个单元继续执行,但当执行失败时,会在当前单元不断重试直到成功,已执行通过的单元不会重试。这样就保证了各个单元的稳定、有序运行,每个环节的执行具有容错性。

image-20210415210047077

基础接口,让使用者可以自己实现任务执行失败的日志记录,比如持久化磁盘或是发送到远程服务器,避免任务丢失,是保持事务一致性的兜底方案之一,设置成缺省方法使得使用者有选择地实现,不强制一定要有失败处理方案。

/** * 失败记录器 */interface IFailRecorder {    /**     * 记录每次重试的失败情况     * @param attemptTimes 重试次数,第一次重试=0     * @param e 导致失败的异常     */    default void recordFail(int attemptTimes, Exception e){}    /**     * 记录每次重试的失败情况     * @param attemptTimes 重试次数,第一次重试=0     */    default void recordFail(int attemptTimes){}    /**     * 记录重试之后的最终失败     * @param e 导致失败的异常,如果没有异常,返回null     */    default void recordEnd(Exception e){}}
复制代码

定义执行的基本单元,代表需要执行一个redis操作或是发送MQ操作,接口方法可能会由调度器重复地执行,所以要求接口实现者自身保证幂等性。

/** * 可重复执行的任务 */public interface Repeatable<V> extends IFailRecorder{    /**     * Computes a result, or throws an exception if unable to do so.     *     * @param repeatTimes repeat times, first repeatTimes is 0     * @return computed result     * @throws Exception if unable to compute a result     */    V compute(int repeatTimes) throws Exception;    /**     * Execute with no result, and throws an exception if unable to do so.     *     * @param repeatTimes repeat times, first repeatTimes is 0     * @param receiveValue last step computed result     * @throws Exception if unable to compute a result     */    default void execute(int repeatTimes, V receiveValue) throws Exception{}    /**     * Execute with no result, and throws an exception if unable to do so.     *     * @param repeatTimes repeat times, first repeatTimes is 0     * @throws Exception if unable to compute a result     */    default void execute(int repeatTimes) throws Exception{}}
复制代码

对应的派生抽象类,主要是为了引导用户实现接口。

/** * 可计算任务 * @param <V> 计算结果类型 */public abstract class Computable<V> implements Repeatable<V>{    @Override    public void execute(int repeatTimes) throws Exception {        throw new IllegalAccessException("不支持的方法");    }    @Override    public void execute(int repeatTimes, V receiveValue) throws Exception {        throw new IllegalAccessException("不支持的方法");    }}/** * 可执行任务 */public abstract class Executable<V> implements Repeatable<V>{    @Override    public V compute(int repeatTimes) throws Exception {        throw new IllegalAccessException("不支持的方法");    }}
复制代码

重试的意义

好的重试机制可以起到削峰填谷的作用,而不好的重试机制可能火上浇油。

这不是危言耸听,仔细思考一下,程序什么情况下会失败,大致可以总结为三种情况:

  1. 参数错误导致的逻辑异常
  2. 负载过大导致的超时或熔断
  3. 不稳定的网络与人工意外事故

其中对于情况1进行重试是完全没有意义的,参数错误的问题应该通过改变参数来解决,逻辑异常应该修复逻辑bug,无脑重试只能让错误重复发生,只会浪费cpu。对于情况2的重试得小心,因为遇到流量波峰而失败,短时间内重试很可能再次遭遇失败,并且这次重试还会带来更大的流量压力,像滚雪球一样把自己搞垮,也就是火上浇油。

对于情况3的重试就非常有价值,尤其是对于具有SLA协议的第三方服务。第三方服务可能因为种种意外(比如停服更新),导致服务短暂不可用,但是却不违反SLA协议。将这种失败情况加入重试队列,确保只要第三方服务在较长的一段时间内有响应,任务就可以成功,如果第三方服务一直没有响应而导致任务最终失败,那么他往往也就破坏了SLA协议,可以申请赔偿了。

所以,设计重试策略时首先需要判断什么情况下需要重试,可以设定当出现特定的比如参数错误的异常时,就没必要重试了,直接失败即可。可以设定只要当返回参数不为空时才算成功。可以设置固定的重试间隔,让两个重试之间拉开比较长的时间。

更聪明的做法是,使用断路器模式,借助当前连接对目标服务器的请求结果,如果不符预期(异常比率大),就暂时阻塞重试队列中等待的任务,隔一段时间再试探一下。

重试队列与普通限流降级或熔断的区别:

image-20210415234437188

重试的策略

重试策略决定任务何时发起重试,重试策略接口:

/** * 重试策略,决定任务何时可以重试 */public interface IRetryStrategy {    /**     * 现在是否应该执行重试     * @param attemptTimes 第几次重试     * @param lastTimestamp 上一次重试的时间戳     * @param itemId 当前的执行项目id     * @return 允许重试,返回true,否则,返回false     */    boolean shouldTryAtNow(int attemptTimes, long lastTimestamp, int itemId);    /**     * 通知一次失败     * @param itemId 当前的执行项目id     */    void noticeFail(int itemId);    /**     * 通知一次成功     * @param itemId 当前的执行项目id     */    void noticeSuccess(int itemId);}
复制代码

基本实现类:

/** * 指定间隔时间的重试策略 */public class DefinedRetryStrategy implements IRetryStrategy {    private final int[] intervals;    public DefinedRetryStrategy(int... intervals) {        if (intervals.length == 0) {            this.intervals = new int[]{0};        } else {            this.intervals = intervals;        }    }    private DefinedRetryStrategy() {        this.intervals = new int[]{0};    }    /**     * 现在是否应该执行重试     *     * @param attemptTimes  第几次重试     * @param lastTimestamp 上一次重试的时间戳     * @param itemId        当前的执行项目id     * @return 允许重试,返回true,否则,返回false     */    @Override    public boolean shouldTryAtNow(int attemptTimes, long lastTimestamp, int itemId) {        return System.currentTimeMillis() > lastTimestamp + getWaitSecond(attemptTimes) * 1000L;    }    @Override    public void noticeFail(int itemId) {    }    @Override    public void noticeSuccess(int itemId) {    }    /**     * 根据当前重试次数,获取下一次重试等待间隔(单位:秒)     */    private int getWaitSecond(int attemptTimes) {        if (attemptTimes < 0) {            attemptTimes = 0;        }        if (attemptTimes >= intervals.length) {            attemptTimes = intervals.length - 1;        }        return intervals[attemptTimes];    }}
复制代码

使用断路器实现重试策略,断路器内部实现省略:

/** * 断路器模式实现的智能的重试策略 */public class SmartRetryStrategy extends DefinedRetryStrategy {    //断路器集合    private final Map<Integer, CircuitBreaker> circuitBreakers = new ConcurrentHashMap<>();    private final Object LOCK = new Object();    private static CircuitBreaker newCircuitBreaker() {        return new ExceptionCircuitBreaker();    }    public SmartRetryStrategy(int[] intervals) {        super(intervals);    }    private CircuitBreaker getCircuitBreaker(Integer itemId) {        if (!circuitBreakers.containsKey(itemId)) {            synchronized (LOCK) {                if (!circuitBreakers.containsKey(itemId)) {                    circuitBreakers.put(itemId, newCircuitBreaker());                }            }        }        return circuitBreakers.get(itemId);    }    /**     * 现在是否应该执行重试     *     * @param attemptTimes  第几次重试     * @param lastTimestamp 上一次重试的时间戳     * @param itemId        当前的执行项目id     * @return 允许重试,返回true,否则,返回false     */    @Override    public boolean shouldTryAtNow(int attemptTimes, long lastTimestamp, int itemId) {        //如果基本条件不满足,则不能重试        if (!super.shouldTryAtNow(attemptTimes, lastTimestamp, itemId)) {            return false;        }        //断路器是否允许请求通过        return canPass(itemId);    }    /**     * 通知一次失败     *     * @param itemId 当前的执行项目id     */    @Override    public void noticeFail(int itemId) {        getCircuitBreaker(itemId).onFail();    }    /**     * 通知一次成功     *     * @param itemId 当前的执行项目id     */    @Override    public void noticeSuccess(int itemId) {        getCircuitBreaker(itemId).onSuccess();    }    /**     * 是否允许通过     */    public boolean canPass(int itemId){        return getCircuitBreaker(itemId).canPass();    }}
复制代码

可重试任务

根据上面的结构图,定义可重试任务接口:

/** * 重试任务 */public interface IRetryTask<V> {    /**     * 执行一次重试     * @return 如果执行成功,返回true,否则返回false     */    boolean tryOnce();    /**     * 是否应该关闭任务     * @return 如果达到最大重试次数,返回true,表示可以关闭     */    boolean shouldClose();    /**     * 现在是否应该执行重试     * @return 当等待时间超过重试间隔时间后,允许重试,返回true,否则,返回false     */    boolean shouldTryAtNow();    /**     * 获取执行结果     */    V getResult();}
复制代码

然后设计抽象类:

/** * 重试任务. * 非线程安全 */public abstract class AbstractRetryTask<V> implements IRetryTask<V> {    //重试等待间隔    protected final IRetryStrategy retryStrategy;    //当前重试次数    protected int curAttemptTimes = -1;    //最大重试次数    private final int maxAttemptTimes;    //上一次重试的时间戳    protected long lastTimestamp = 0;    public AbstractRetryTask(IRetryStrategy retryStrategy, int maxAttemptTimes) {        this.retryStrategy = retryStrategy;        this.maxAttemptTimes = maxAttemptTimes;    }    /**     * 执行一次重试     *     * @return 如果执行成功,返回true,否则返回false     */    @Override    public boolean tryOnce() {        if (isFinished()) {            return true;        }        setNextCycle();        //执行重试        doTry();        //重试任务执行异常或者返回null,将视为执行失败        return isFinished();    }    /**     * 是否结束     */    protected abstract boolean isFinished();    /**     * 执行回调     */    protected abstract void doTry();    /**     * 是否应该关闭任务     *     * @return 如果达到最大重试次数,返回true,表示可以关闭     */    @Override    public boolean shouldClose() {        return curAttemptTimes >= maxAttemptTimes;    }    //设置下一执行周期    private void setNextCycle() {        curAttemptTimes++;        lastTimestamp = System.currentTimeMillis();    }}
复制代码

以及实现类:

/** * 多段重试任务. 任务链路执行失败时,下一次重试从失败的点继续执行。 */@Slf4jpublic class SegmentRetryTask<V> extends AbstractRetryTask<V> {    //分段执行方法    private final List<Repeatable<V>> segments;    //当前执行片段,上一次执行中断的片段    private int currentSegment = 0;    //上一次的执行结果值    private V result;    public SegmentRetryTask(IRetryStrategy retryStrategy, int maxAttemptTimes, List<Repeatable<V>> segments) {        super(retryStrategy == null ? new DefinedRetryStrategy(0) : retryStrategy, maxAttemptTimes);        this.segments = segments;    }    /**     * 执行回调     */    @Override    protected void doTry() {        try {            for (; currentSegment < segments.size(); currentSegment++) {                //如果当前断路器打开,不尝试执行                if (retryStrategy instanceof SmartRetryStrategy){                    if (!((SmartRetryStrategy)retryStrategy).canPass(currentSegment)) {                        segments.get(currentSegment).recordFail(curAttemptTimes, new CircuitBreakingException());                        return;                    }                }                //如果抛异常,分段计数器不增加,下次从这个地方执行                Repeatable<V> repeatable = segments.get(currentSegment);                if (!execute(repeatable)) return;            }        } catch (Exception e) {            retryStrategy.noticeFail(currentSegment);            if (currentSegment < segments.size()) {                if (shouldClose()) {                    segments.get(currentSegment).recordEnd(e);                } else {                    segments.get(currentSegment).recordFail(curAttemptTimes, e);                }            }        }    }    private boolean execute(Repeatable<V> repeatable) throws Exception {        if (repeatable instanceof Computable) {            result = repeatable.compute(curAttemptTimes);            if (result == null) {                repeatable.recordFail(curAttemptTimes);                retryStrategy.noticeFail(currentSegment);                return false;            }            retryStrategy.noticeSuccess(currentSegment);        }        if (repeatable instanceof Executable) {            if (result == null) {                repeatable.execute(curAttemptTimes);            } else {                repeatable.execute(curAttemptTimes, result);            }            retryStrategy.noticeSuccess(currentSegment);        }        return true;    }    @Override    protected boolean isFinished() {        return currentSegment >= segments.size();    }    /**     * 现在是否应该执行重试     *     * @return 当等待时间超过重试间隔时间后,允许重试,返回true,否则,返回false     */    @Override    public boolean shouldTryAtNow() {        return retryStrategy.shouldTryAtNow(curAttemptTimes, lastTimestamp, currentSegment);    }    /**     * 获取执行结果     */    @Override    public V getResult() {        return result;    }}
复制代码

一个单元测试,当然单元测试有很多,不能全贴出来,这里只展示有代表性的:

class SegmentRetryTaskTest {    private final List<String> messages = new ArrayList<>();    @Test    void doTry() {        List<Repeatable<String>> list = new ArrayList<>();        list.add(new Computable<>(){            @Override            public String compute(int repeatTimes) throws Exception {                if (repeatTimes < 2)                    throw new Exception();                if (repeatTimes < 4)                    return null;                messages.add("result:good");                return "good";            }            @Override            public void recordFail(int attemptTimes, Exception e) {                messages.add("fail:" + attemptTimes);            }            @Override            public void recordFail(int attemptTimes) {                messages.add("fail:" + attemptTimes);            }            @Override            public void recordEnd(Exception e) {                messages.add("end");            }        });        list.add(new Executable<>() {            @Override            public void execute(int repeatTimes, String receiveValue) throws Exception {                messages.add("receive:" + receiveValue);                throw new Exception("exc");            }            @Override            public void recordEnd(Exception e) {                messages.add("end:" + e.getMessage());            }        });        IRetryTask retryTask = new SegmentRetryTask<>(new DefinedRetryWaitStrategy(0), 5, list);        //重试未开始        assertFalse(retryTask.shouldClose());        //重试直到成功        assertFalse(retryTask.tryOnce());        assertFalse(retryTask.shouldClose());        assertFalse(retryTask.tryOnce());        assertFalse(retryTask.tryOnce());        assertFalse(retryTask.tryOnce());        assertFalse(retryTask.tryOnce());        assertFalse(retryTask.tryOnce());        assertTrue(retryTask.shouldClose());        assertTrue(messages.contains("result:good"));        assertTrue(messages.contains("fail:1"));        assertTrue(messages.contains("fail:2"));        assertTrue(messages.contains("fail:3"));        assertFalse(messages.contains("end"));        assertTrue(messages.contains("receive:good"));        assertTrue(messages.contains("end:exc"));    }}
复制代码

重试队列的运作

image-20210416101646494

线程安全的重试队列。* (Spring-retry 和 guava-retrying都不完全适合这个场景,决定自己开发一个简单的重试机制)* 重试队列会尽最大努力让任务多次执行并成功,使用时需要考虑以下几点。* 1.重试队列存储在内存之中,暂未同步到磁盘,要求使用者可以承受丢失的风险。* 2.重试不保证一定会成功,它将在重试一定的次数后结束,如果最终失败,将记录失败结果。* 3.为了不让频繁的重试让系统的负载过大,建议设置恰当的重试间隔,以起到削峰填谷的作用。* 4.当超过重试队列允许容纳的数量时,将抛出异常。* 5.重试任务将在独立的线程中执行,不会阻塞当前线程* 6.重试任务执行异常或者返回null,将视为执行失败。暂不支持拦截自定义异常。* 7.由于网络问题,远程过程执行成功未必代表会返回成功,重试任务需要实现幂等性。* 8."队列"仅指按先进先出的顺序扫描任务,任务移除队列操作取决于其何时完成或结束*
复制代码

实现重试队列

/** * 线程安全的重试队列。 * @author sunday * @version 0.0.1 */public final class RetryQueue {    //重试任务队列(全局唯一)    private final static Deque<IRetryTask> retryTaskList = new ConcurrentLinkedDeque<>();    //重试任务工厂    private final IRetryTaskFactory retryTaskFactory;    public RetryQueue(IRetryTaskFactory retryTaskFactory) {        this.retryTaskFactory = retryTaskFactory;    }    static {        Thread daemon = new Thread(RetryQueue::scan);        daemon.setDaemon(true);        daemon.setName(RetryConstants.RETRY_THREAD_NAME);        daemon.start();    }    //扫描重试队列,执行重试并移除任务(如果成功),周期性执行    private static void scan() {        while (true) {            //先执行,再删除            retryTaskList.removeIf(task -> retry(task) || task.shouldClose());            // wait some times            try {                TimeUnit.MILLISECONDS.sleep(RetryConstants.SCAN_INTERVAL);            } catch (Throwable ignored) {            }        }    }    //执行重试    private static boolean retry(/*not null*/IRetryTask task) {        if (task.shouldTryAtNow()) {            return task.tryOnce();        }        return false;    }    /**     * 提交任务。在当前线程立刻执行,如果失败,则使用设置的重试任务工厂创建包装对象,把这个对象写入重试队列等待异步重试。     *     * @param segments 分段执行任务     * @param <V>      结果返回类型     * @return 如果当前线程一次就执行成功,同步返回结果值,否则加入重试队列,异步通知结果值。     * @throws RetryRefuseException 当超过重试队列允许容纳的数量时,将抛出异常     */    public final <V> V submit(List<Repeatable<V>> segments) throws RetryRefuseException {        if (segments == null || segments.size() == 0) {            return null;        }        IRetryTask<V> task = retryTaskFactory.createRetryTask(segments);        //在当前线程执行        if(!task.tryOnce()){            //失败后加入队列            ensureCapacity();            retryTaskList.push(task);        }        //只要当前已经有执行结果,就返回,即便是加入了重试队列        return task.getResult();    }    /**     * 提交任务。在当前线程立刻执行,如果失败,则使用设置的重试任务工厂创建包装对象,把这个对象写入重试队列等待异步重试。     *     * @param repeatable 执行任务     * @param <V>        结果返回类型     * @return 如果当前线程一次就执行成功,同步返回结果值,否则加入重试队列,异步通知结果值。     * @throws RetryRefuseException 当超过重试队列允许容纳的数量时,将抛出异常     */    public final <V> V submit(Repeatable<V> repeatable) throws RetryRefuseException {        return submit(List.of(repeatable));    }    //确保容量    private void ensureCapacity() throws RetryRefuseException {        //非线程安全,高并发下可能短暂冲破最大容量,不过问题不大        if (retryTaskList.size() >= RetryConstants.MAX_QUEUE_SIZE) {            throw RetryRefuseException.getInstance();        }    }    /**     * 队列是否为空     *     * @return 如果当前无正在执行的任务,返回true     */    public boolean isEmpty() {        return retryTaskList.isEmpty();    }}
复制代码

单元测试:

class RetryQueueTest {    private final static int NUM = 100000;    private List<String> messages1 = Collections.synchronizedList(new ArrayList<>());    private List<String> messages2 = Collections.synchronizedList(new ArrayList<>());    IRetryTaskFactory taskFactory = new IRetryTaskFactory() {        @Override        public <V> IRetryTask createRetryTask(List<Repeatable<V>> segments) {            return new SegmentRetryTask<>(new DefinedRetryWaitStrategy(0), 10, segments);        }    };    RetryQueue retryQueue = new RetryQueue(taskFactory);    @Test    void submit() {        List<Repeatable<String>> list = new ArrayList<>();        list.add(new Executable<>() {            @Override            public void execute(int repeatTimes) throws Exception {                if (repeatTimes < 4)                    throw new Exception();                messages1.add("good");            }        });        //模拟高并发提交        ExecutorService executorService = Executors.newFixedThreadPool(100);        Semaphore semaphore = new Semaphore(0);        for (int i = 0; i < NUM; i++) {            executorService.submit(() -> {                try {                    retryQueue.submit(list);                } catch (RetryRefuseException e) {                    fail();                }                semaphore.release();            });        }        executorService.shutdown();        //等待执行完成        try {            semaphore.acquire(NUM);        } catch (InterruptedException e) {            e.printStackTrace();        }        //等待执行完成        while (!retryQueue.isEmpty()) Thread.yield();        assertEquals(NUM, messages1.size());        for (String s : messages1) {            assertEquals(s, "good");        }    }}
复制代码

久等的点赞实现代码

好了,轮子已经造完了,可以开始写点赞服务的代码了:

/** * 投票服务 */@Service@Slf4jpublic class VoteService {    private final VoteBox voteBox;    private final MessagePoster mq;    private final RetryQueue retryQueue = new RetryQueue(new SegmentRetryTaskFactory());    public VoteService(VoteBox voteBox, MessagePoster mq) {        this.voteBox = voteBox;        this.mq = mq;    }    /**     * 给评价投票(点赞)     *     * @param voterId   投票人     * @param contentId 投票目标内容id     * @param voting    是否进行点赞(true:点赞  false:取消点赞)     * @return 当前内容点赞后的总数,如果点赞失败,抛出异常     * @throws VoteException 投票异常     */    public int vote(long voterId, long contentId, boolean voting) throws VoteException {        /*         * 第零种情况:用户请求没有发送到服务器,用户可以适时重试。         * 第一种情况:执行1失败,最终点赞失败,记录日志,加入重试队列池,用户也可以适时重试。         * 第二种情况:执行1成功,但返回时网络异常,最终点赞失败,记录日志,加入重试队列池,用户也可能适时重试,该方法是幂等的。         * 第三种情况:执行1成功,但并未增加点赞总数,因为这次是重复提交。仍然执行之后的逻辑,该方法是幂等的。         * 第四种情况:执行1成功,但执行2失败,记录日志,把发送mq加入重试队列池,返回成功。         * 第五种情况:执行方法成功,但返回过程网络异常,用户未收到响应,用户稍后可以查询出点赞结果,用户也可以适时重试         */        List<Repeatable<Integer>> list = new ArrayList<>();        //1.先在redis中投票        list.add(new Computable<>() {            @Override            public Integer compute(int repeatTimes) {                return voting ? voteBox.vote(voterId, contentId) : voteBox.noVote(voterId, contentId);            }            @Override            public void recordFail(int attemptTimes, Exception e) {                //只记录第一次错误                if (attemptTimes == 0)                    log.warn("function VoteService.vote.redis make exception:{} by:{},{},{}", e.getMessage(), voterId, contentId, voting);            }            @Override            public void recordEnd(Exception e) {                //放弃重试.当然,日志会记录下来,或者通过其他机制将失败记录到中央存储库中,最终还是可以恢复。                log.warn("function VoteService.vote.redis quit:{} by:{},{},{}", e.getMessage(), voterId, contentId, voting);            }        });        //2.再通知mq        list.add(new Executable<>() {            @Override            public void execute(int repeatTimes, Integer receiveValue) {                JSONObject object = new JSONObject();                object.put("voterId", voterId);                object.put("contentId", contentId);                object.put("voting", voting ? 1 : 0);                object.put("votes", receiveValue);                mq.sendMessage(SystemConstants.VOTE_TOPIC, object.toString());            }            @Override            public void recordFail(int attemptTimes, Exception e) {                if (attemptTimes == 0)                    log.warn("function VoteService.vote.mq make exception:{} by:{},{},{}", e.getMessage(), voterId, contentId, voting);            }            @Override            public void recordEnd(Exception e) {                log.trace("function VoteService.vote.mq quit:{} by:{},{},{}", e.getMessage(), voterId, contentId, voting);            }        });        Integer value = null;        try {            //系统可能因为mq或者redis自身的过载等问题导致点赞失败,我们想珍惜用户的一次点赞,所以选择为他重试。            value = retryQueue.submit(list);        } catch (RetryRefuseException e) {            log.error("function VoteService.vote.refuse make exception:{} by:{},{},{}", e.getMessage(), voterId, contentId, voting);        }        if (value == null){            //当前无法获得投票总数,意味着点赞操作失败,虽然我们会稍后重试,但仍将这个信息告知用户,他们可以进行更理智的选择。            throw new VoteException("投票失败,请稍后再试");        }        return value;    }    private static class SegmentRetryTaskFactory implements IRetryTaskFactory {        private final static IRetryStrategy waitStrategy = new SmartRetryStrategy(new int[]{10,100,100,1000,10000});        @Override        public <V> IRetryTask<V> createRetryTask(List<Repeatable<V>> segments) {            return new SegmentRetryTask<>(waitStrategy, 5, segments);        }    }}
复制代码

补充说明:

  1. 封装工厂对象的目的是为了简化构造方法参数,并且复用不变对象,如重试策略。
  2. 只要重试队列执行有返回结果,哪怕只是部分成功,仍可以算作接口响应成功,剩余部分加入重试队列。
  3. 如果重试队列执行全部失败,没有返回结果,则抛出异常,毕竟此刻确实失败了,用户有权知道。
  4. 只有熔断器闭合时,才会执行任务,否则将会一直等待,可以设置恰当的中止策略来完善这个机制。
  5. 重试队列这个轮子在其他很多场景也都有用武之地,依照我的理解,它大致算是“仓库层”。

但就点赞实现来说,没有必要使用重试,实际上,mq是多节点高可用的,一般不会出现问题,并且,mq自带了重试功能。mq的重试机制是,在一次请求中,如果失败了,立刻向另外的broker发起请求,是一种负载均衡融合高可用的设计。在不要求刚性事务的情景下,可以认为mq是可靠的。

给评价添加点赞

评价列表的数据是相对静态的,不含用户个性化信息,可以很容易地缓存供所有人访问,但是一旦加上用户对每个评价的点赞关系,或是实时变化的点赞数量信息,就变得难以缓存了。我们选择动静分离,静态的数据按照原先的缓存策略不变,动态的数据专门从redis服务中获取,然后再追加到静态数据上。

服务层、控制层,就是数据的聚合层、任务的委派层。

而至于数据聚合,有三种模式:

image-20210416110925640

我们选择第三种方式,这次设计点赞功能,只是作为评价系统的一部分。

在RemarkService中添加如下代码:

/** * 给评价列表添加点赞信息,在现有列表数据上修改 * @param remarks 评价列表 * @param consumerId 用户id * @return 修改后的评价列表 */public JSONArray appendVoteInfo(JSONArray remarks, Integer consumerId){    if (remarks == null || remarks.size() == 0) {        return remarks;    }    //获取评价id列表    List<Object> idList = new ArrayList<>();    for (int i = 0; i < remarks.size(); i++) {        idList.add(remarks.getJSONObject(i).getString("id"));    }    //获取并添加点赞总数    List<String> voteKeys = new ArrayList<>();    for (Object s : idList) {        voteKeys.add(MessageFormat.format(RedisKeyConstants.VOTE_SUM_PATTERN, s));    }    List<Object> voteValues = redisRepository.readAll(voteKeys);    for (int i = 0; i < remarks.size(); i++) {        remarks.getJSONObject(i).put("votes", voteValues.get(i) == null ? 0 : voteValues.get(i));    }    //未传用户id,查询时不附带个人点赞数据    if (consumerId == null) {        return remarks;    }    //获取并添加个人点赞状态    List<String> votesKeys = new ArrayList<>();    for (Object s : idList) {        votesKeys.add(MessageFormat.format(RedisKeyConstants.VOTE_USER_PATTERN, consumerId, s));    }    List<Object> votingValues = redisRepository.readAll(votesKeys);    for (int i = 0; i < remarks.size(); i++) {        remarks.getJSONObject(i).put("voting", votingValues.get(i) == null ? 0 : 1);    }    return remarks;}//更新商品的评价缓存private void updateRemarkCache(String itemId){    //吞掉异常,让更新评价方法不影响原操作的执行结果    try {        redisRepository.refreshKeys(RedisKeyConstants.REMARK_PREFIX + itemId);    } catch (Exception e) {        log.warn("function RemarkService.updateRemarkCache make exception:{} by:{}", e.getMessage(), itemId);    }}
复制代码

修改查询评价列表接口,聚合内容:

/** * 查询商品关联的评价,一次查询固定的条目 * @param itemId 商品id * @param curIndex 当前查询坐标 */@GetMapping("/remark")public APIBody listRemarks(String itemId, int curIndex, Integer consumerId){    Assert.isTrue(!StringUtils.isEmpty(itemId), "商品id不能为空");    Assert.isTrue(curIndex > 0, "查询坐标异常");    JSONArray list = remarkService.listRemarks(itemId, curIndex, SystemConstants.REMARK_MAX_LIST_LENGTH);    //原列表是从redis或db中读取的静态数据,而点赞数据每时每刻都在变化,分开获取这两个部分。    return APIBody.buildSuccess(remarkService.appendVoteInfo(list, consumerId));}
复制代码

优化点:评价的点赞总数信息是固定的,是用户无关的,可以与评价内容结合在一起缓存在内存中,而用户的点赞信息只能每次请求都去redis查询。

推荐优质评价

完整的评价系统应该能够输出一个优质评价内容的推荐列表,作为用户查看商品评价时的默认展示。

何为”优质内容“呢?我的理解是具有话题性、高热度、内容丰富的评价内容,其中”点赞总数“是衡量高热度的重要指标之一。当前,我们就以点赞数量为唯一指标,算出优质内容并提供查询接口。未来引入其他指标时,也可能会继续沿用这种设计思路。

评价表中有votes字段,可以据此排序生成前n条数据:

select id,consumer_id,order_id,score,header,content,images,user_name,user_face,gmt_create from remark where item_id = ? and status = '1' order by votes desc limit ?
复制代码

需要注意的是,votes字段并不随着用户点赞而更新它,因为频繁的更新是低效的。可以通过定期汇总的方式来更新votes字段,点赞表保存着评价的最新点赞总数,所以可以每隔1天或1小时,筛选这期间内对应内容的最近一条点赞,就可以更新votes了。

不管基础数据是在何种数据库何种表中,不管是通过什么方式,我都将这一步骤称为”回源“,回源是缓存未命中时的一种行为概念。

在加载推荐评价时,回源算法为

public List<Remark> listRecommendRemarks(/*not null*/ String itemId, int expectCount){    if (expectCount <= 0)        return new ArrayList<>();    Assert.isTrue(expectCount <= MAX_LIST_SIZE, "不允许一次性查询过多内容");    String sql = "select id,consumer_id,order_id,score,header,content,images,user_name,user_face,gmt_create from remark where item_id = ? and status = '1' order by votes desc limit ?";    return db.query(sql, (resultSet, i) -> {        Remark remark = new Remark();        remark.setId(resultSet.getLong(1));        remark.setConsumerId(resultSet.getLong(2));        remark.setOrderId(resultSet.getString(3));        remark.setItemId(itemId);        remark.setScore(resultSet.getShort(4));        remark.setHeader(resultSet.getString(5));        remark.setContent(resultSet.getString(6));        remark.setImages(resultSet.getString(7));        remark.setUsername(resultSet.getString(8));        remark.setUserface(resultSet.getString(9));        remark.setCreateTime(resultSet.getString(10));        return remark;    }, itemId, expectCount);}
复制代码

接下来所要做的,只要将这部分内容保存到缓存,然后输出就可以了。

原子性地替换列表

推荐评价是一个列表,我选择使用Redis的LIST数据类型,可以方便地进行范围查询,参考上篇文章的评价列表。

但是Redis并未直接提供替换列表的操作,只有DEL、LRPUSH、RENAME等命令组合在一起可以才能实现,但客户端的组合操作是非原子性的,不用多说,又要使用脚本了:

--删除并创建列表--params    1           2--KEYS      列表键名      代理键--ARGV      列表redis.call('DEL', KEYS[1])for i= 1, #ARGV do    redis.call('RPUSH', KEYS[1], ARGV[i])end--延长代理锁的过期时间redis.call('SET', KEYS[2], 1)redis.call('EXPIRE',KEYS[2], 3600)
复制代码

查询推荐评价的主要代码如下:

@Cacheable(value = "recommend")public JSONArray listRecommendRemarks(/*not null*/ String itemId, int start, int stop) {    try {        if (remarkRedis.shouldUpdateRecommend(itemId)) {            //加锁成功,需要加载数据库中的评价内容到redis            remarkQueue.push(itemId, () -> reloadRecommendRemarks(itemId));        }        return appendVoteInfo(remarkRedis.readRecommendRange(itemId, start, stop));    } catch (Exception e) {        log.error("function RemarkService.listRecommendRemarks make exception:{} by:{},{},{}", e.getMessage(), itemId, start, stop);        return SystemConstants.EMPTY_ARRAY;    }}
复制代码

其中,仍使用代理键的模式,使Redis存储主要业务数据的列表永不过期,避免缓存击穿以及频繁的分布式阻塞加锁。

一些重要的redis操作代码:

//保存推荐内容并重置过期时间public void saveRecommendData(String itemId, /*not null*/ List<Remark> list) {    String[] argv = new String[list.size()];    for (int i = 0; i < list.size(); i++) {        argv[i] = JSONObject.fromObject(list.get(i)).toString();    }    redisTemplate.execute(resetListScript,            List.of(RedisKeyConstants.REMARK_RECOMMEND_PREFIX + itemId,                    RedisKeyConstants.REMARK_RECOMMEND_PROXY_PREFIX + itemId), argv);}//读取推荐内容public JSONArray readRecommendRange(String itemId, int start, int stop) {    String key = RedisKeyConstants.REMARK_RECOMMEND_PREFIX + itemId;    return range(start, stop, key);}//是否应该更新推荐public boolean shouldUpdateRecommend(String itemId) {    Boolean flag = redisTemplate.opsForValue().setIfAbsent(RedisKeyConstants.REMARK_RECOMMEND_PROXY_PREFIX + itemId);    return flag == null || !flag;}
复制代码

冷启动与空数据

冷启动是指服务的第一次上线或者redis在零缓存下重新启动时,这时,任何的缓存都未加载,或者之前加载过,现在因为意外已经不存在了。这时,代理锁会过期,SETNX命令成功,加锁成功的线程会同步数据库数据到redis,这样业务数据KEY就不再为空了。如果同步过程出现失败,锁会在2秒后自动过期,新的线程会继续接任这项未完成的使命。如果业务数据加载完成,那么就随即延迟代理锁的寿命为1小时,这样1小时之后才会触发同步。整个流程是异步的,用户请求的线程只会读取业务数据KEY,有则返回,无则为空。也就是说,接口只在冷启动的几秒内是返回为空的,这是可以接受的,因为冷启动只在新业务上线或者redis内存无法恢复这些极为特殊的时间点才会出现。

空数据是指数据库的内容是原本就是空的。根据上面的设计思路,可以得出结论,如果数据库内容为空,那么业务数据KEY是空的,也就是nil,不存储占位符,因为代理KEY已经起到占位符的作用了。这一点来看,一个简简单单的代理KEY,可以起到防止缓存击穿、防止同步阻塞、占位符等作用。

后续

可能会更新一些抽奖、秒杀活动的实现方法。

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享