这是我参与更文挑战的第9天,活动详情查看: 更文挑战
对于线程池和锁机制以及redis的线上环境使用,可能很多初中级开发不经常遇到,一时不知道应用场景,以及怎么用,正好,这是这几天在做上传知识文件数据处理过程中遇到的问题,需求大概是对于对于上传的文件按行进行分析,每行数据会转为对应的DTO类,每个文件可能几十M或者几百M,对于文本文件,这也算一个大文件了,这也是第一次线上接触线程池的用法
规则数据检查问题/
首先就是对上传数据分析的请求数量做一个拦截,而这由于JVM是独立的,所以这里用redis来存储分析的队列数量,我们都知道对于大数据文件进行分析是非常耗费系统资源的,所以这的队列值得根据项目的具体部署多少机器来合理设置,为了不影响正常逻辑运行,我们项目目前是20
// RedisKeyConstants.CACHE_QUEUE为redis中list集合的key
Long Size = this.redisTemplate.opsForList().size(RedisKeyConstants.CACHE_QUEUE);
复制代码
这里得出来的是进入redis队列数,而对于防止重复提交请求通过往数据库插入记录来判重这里就不过多讨论,肯定是要做的,再根据自己的请求组成一个key,我们这边是根据系统标识和请求ID来做key,与队列中的key做比较,如果存在直接返回,不存在就插入进去
Boolean hasKey = this.redisTemplate.opsForHash().hasKey(RedisKeyConstants.CACHE_MAP, cacheTask);
//向队列中插入此次请求(左)
this.redisTemplate.opsForList().leftPush(RedisKeyConstants.CACHE_QUEUE, cacheTask);
//向hashmap传值
//RedisKeyConstants.CACHE_MAP是redis的批量处理key
//cacheTask是请求的key
//dtoJson是请求key的value值(我们是传入的全体值)
this.redisTemplate.opsForHash().put(RedisKeyConstants.CACHE_MAP, cacheTask, dtoJson);
复制代码
对于请求hasKey 的返回就是说如果redis存在此请求,而此时我同一个请求又进来了,这明显应该拦截住的,所以我们只能认为redis处理出问题了,所以我们得清除这条,而重新处理新的请求,由于我们是分析返回处理结果的,所有此处删除的是分析的结果,此处删除有一定的实际逻辑控制,看需求来添加
String key = StrUtil.format(ANALY_COUNT_REPORT,DTO.getSysCode(),DTO.getTaskId());redisTemplate.delete(key);
复制代码
到这总该开发线程池逻辑了吧,No,No,No,就在写着写着过程中,我发现了令我狂欢的分布式锁,我爽了一下,在网上逛了一圈,并没有说太多这方面的,我只能自己看源码,
接着上面的分析路径走,请求已进来了,我们要判断可不可以分析了;
如果集群有分析正在进行,则禁止发布任务,否则发布任务(题外话:这里我们线上并没有用zk订阅发布,而是采用redis,并不是说我们没有采用zk这个技术,而是一群大佬的分析报告和成本预算总结redis足够了,没必要让zk再来接手这块),
// 获取分布式锁tryLock = lock.tryLock(RedisKeyConstants.TASK_LOCK, AnalyzeConstants.xxx_LOCK);
复制代码
简单解释一下这段代码,首先根据自定义的key以及以及你的请求客户端,我们没有复用代码,所有此处根据需求直接写死了,这里的调用的expireTime默认五分钟时间(我们系统自定义的),具体逻辑大概就是获取你的主机连接,如果你是cluster就走集群逻辑否则走单机的eval();然后获取分布式锁,看到下面的LOCK_LUA参数了吗,那个就是lua脚本(我们可以自己百度,StringBuffer拼接即可),加锁失败就抛出异常;
下面的一点获取锁代码是我们自己封装的,大家估计看不到,我就尽量语言描述一下,但底层都一样
public boolean tryLock(String lockKey, String reqId, int expireTime) {
boolean flag =false;
Long res = redisTemplate.execute(new RedisCallback<Long>() {
public Long RedisCheck(RedisConnection connection) throws DataAccessException {
Object nativeConnection = connection.getNativeConnection();
if (nativeConnection instanceof JedisCluster) {
//cluster
return (Long) ((JedisCluster) nativeConnection).eval(LOCK_LUA, Collections.singletonList(lockKey),
Arrays.asList(reqId, Integer.toString(expireTime)));
} else if (nativeConnection instanceof Jedis) {
//single
return (Long) ((Jedis) nativeConnection).eval(LOCK_LUA, Collections.singletonList(lockKey),
Arrays.asList(reqId, Integer.toString(expireTime)));
}
return null;
}
)};
//此处返回1L表示获取成功
}
复制代码
还记得我们之前leftpush的任务吗,现在我们需要将队列中的任务取出来
this.redisTemplate.opsForList().rightPop(RedisKeyConstants.CACHE_QUEUE);
复制代码
还记得我们当时put的map吗?依然取出来,因为我们的请求参数在那里
DTO = (String) this.redisTemplate.opsForHash().get(RedisKeyConstants.CACHE_MAP,cacheTask);
复制代码
取出过后删除该map
然后获取路径下所有文件列表并一下缓存进redis,如果文件实在过多,那就分批次传入,
redisTemplate.opsForList().leftPushAll(currentRedisKey, tmpList);
复制代码
此处可将数据文件过程插入库表;并将分析状态置为正在运行,同时将分析任务发布出去,供集群进行分析,最后,别忘记释放分布式锁,最后通过线程池化执行该任务,这里通过配置监听发布订阅bean.xml通过org.springframework.data.redis.listener.adapter.MessageListenerAdapter
<bean id="xxxxxx" class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">
<constructor-arg index="0" ref="类名"/>
<constructor-arg index="1" value="方法名"/>
</bean>
复制代码
这里的redis具体逻辑结束了,下篇说说逻辑结束后线程池如何执行的
本来想把线程池一并写了,但感觉篇幅会很多,拆开。。。。。。
完结/
以上是我截取的一部分逻辑,
干就完了,奥利给
好啦,今天的不开心就止于此吧,明天依旧光芒万丈发啊,宝贝!