模块依赖关系和模块封装详解
前言
上一篇文章已经对大致对系统做了整体的分析,并简单介绍了每个模块的功能,本篇文章将详细介绍系统各个模块一些重要的功能实现。
一、版本选择
- Springboot选用的版本是2.1.4,于是spring cloud选择的对应的G版本(Greenwich.RELEASE),一定要选择和Springboot版本对应的版本,否则项目无法正常启动,详细的对应关系可自行百度。
- Nacos选用的是1.4,可自行到官网下载。
- RocketMq测试环境安装的是window版,版本号4.3.0,安装可自行百度,单击版记得要改配置文件。
- Redis版本为4.0。
- Mysql建议5.7以上。
- MongoDB建议4.0以上。
- nginx选用1.18,支持websocket即可。
二、模块依赖关系
1、整体结构
父工程pom
......
<modules>
<module>common-ws-starter</module>
<module>pc-wx-spider</module>
<module>java-wx-spider</module>
<module>redis-ws-starter</module>
<module>db-ws-starter</module>
<module>sql-wx-spider</module>
<module>rocketmq-ws-starter</module>
<module>mobile-wx-spider</module>
</modules>
......
复制代码
截图展示:
2、依赖关系
starter结尾的都是自己封装的starter,spider结尾的是运行的springboot项目,不了解starter可先熟悉Springboot启动流程。
- common-ws-starter
不依赖其他模块
- db-ws-starter
不依赖其他模块
- redis-ws-starter
不依赖其他模块
- rocketmq-ws-starter
不依赖其他模块
- sql-wx-spider
依赖于common-ws-starter、com.wx.spider
<!--集成公共模块-->
<dependency>
<groupId>com.wx.spider</groupId>
<artifactId>common-ws-starter</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!--集成mysql模块-->
<dependency>
<groupId>com.wx.spider</groupId>
<artifactId>db-ws-starter</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
复制代码
- pc-wx-spider
依赖于common-ws-starter、redis-ws-starter、rocketmq-ws-starter
<!--集成公共模块-->
<dependency>
<groupId>com.wx.spider</groupId>
<artifactId>common-ws-starter</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!--集成redis模块-->
<dependency>
<groupId>com.wx.spider</groupId>
<artifactId>redis-ws-starter</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!--集成mq模块-->
<dependency>
<groupId>com.wx.spider</groupId>
<artifactId>rocketmq-ws-starter</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
复制代码
- mobile-wx-spider
依赖于common-ws-starter、redis-ws-starter、rocketmq-ws-starter
<!--集成公共模块-->
<dependency>
<groupId>com.wx.spider</groupId>
<artifactId>common-ws-starter</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!--集成redis模块-->
<dependency>
<groupId>com.wx.spider</groupId>
<artifactId>redis-ws-starter</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!--集成mq模块-->
<dependency>
<groupId>com.wx.spider</groupId>
<artifactId>rocketmq-ws-starter</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
复制代码
- java-wx-spider
依赖于common-ws-starter、rocketmq-ws-starter
<!--集成公共模块-->
<dependency>
<groupId>com.wx.spider</groupId>
<artifactId>common-ws-starter</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!--集成mq模块-->
<dependency>
<groupId>com.wx.spider</groupId>
<artifactId>rocketmq-ws-starter</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
复制代码
三、功能模块详解
本节对模块里一些重点功能加以说明。
1、common-ws-starter
公用模块主要放一些model、pojo、vo、util等,同时供多个模块使用,是在util中封装了一些系统的定制功能,比如解析抓包返回的json,并没有难理解的地方。值得注意的是Mongo和mysql对应的实体类需要在字段上加注解,则需要在pom.xml中导入响应坐标,其他模块引入的时候需要排除springboot默认的自动装配功能,否则自己封装后可能启动失败。
2、db-ws-starter
自定义数据源模块,此模块通过自定义注解和AOP实现了多数据源动态切换,此模块并无任何业务逻辑,虽然mybatis-plus有自带动态数据源功能注解,但灵活性不强,于是自己封装数据为后期加分布式事务和分库分表做准备。
枚举限定数据源名称
package com.wx.spider.db.constant;
/**
* @author:feng
* @create:2021-04-16 15:59
*/
public enum DataSourceKey {
core, sec
}
复制代码
这里可配置多个数据源,需根据自己业务而定,目前配置两个,需要和bootstrap.yml中的配置对应。
自定义注解@DataSource,并配置切面
在业务逻辑类中加入此注解可自动去操作name属性对应的数据源。
package com.wx.spider.db.annotation;
import java.lang.annotation.*;
/**
* @author:feng
* @create:2021-04-16 15:54
*/
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DataSource {
//数据库名称
String name();
}
复制代码
切面的实现:
package com.wx.spider.db.aop;
import com.wx.spider.db.annotation.DataSource;
import com.wx.spider.db.constant.DataSourceKey;
import com.wx.spider.db.uitl.DataSourceHolder;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.springframework.core.annotation.Order;
/**
* @author:feng
* @create:2021-04-16 15:55
*/
@Slf4j
@Aspect
@Order(-1) // 保证该AOP在@Transactional之前执行
public class DataSourceAOP {
@Before("@annotation(ds)")
public void changeDataSource(JoinPoint point, DataSource ds) throws Throwable {
String dsId = ds.name();
try {
DataSourceKey dataSourceKey = DataSourceKey.valueOf(dsId);
DataSourceHolder.setDataSourceKey(dataSourceKey);
} catch (Exception e) {
log.error("数据源[{}]不存在,使用默认数据源 > {}", ds.name(), point.getSignature());
}
}
@After("@annotation(ds)")
public void restoreDataSource(JoinPoint point, DataSource ds) {
log.debug("Revert DataSource : {transIdo} > {}", ds.name(), point.getSignature());
DataSourceHolder.clearDataSourceKey();
}
}
复制代码
package com.wx.spider.db.uitl;
import com.wx.spider.db.constant.DataSourceKey;
/**
* @author:feng
* @create:2021-04-16 16:00
*/
public class DataSourceHolder {
//注意使用ThreadLocal,微服务下游建议使用信号量
private static final ThreadLocal<DataSourceKey> dataSourceKey = new ThreadLocal<>();
//得到当前的数据库连接
public static DataSourceKey getDataSourceKey() {
return dataSourceKey.get();
}
//设置当前的数据库连接
public static void setDataSourceKey(DataSourceKey type) {
dataSourceKey.set(type);
}
//清除当前的数据库连接
public static void clearDataSourceKey() {
dataSourceKey.remove();
}
}
复制代码
package com.wx.spider.db.uitl;
import com.wx.spider.db.constant.DataSourceKey;
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
/**
* @author:feng
* @create:2021-04-16 16:00
*/
public class DynamicDataSource extends AbstractRoutingDataSource {
private Map<Object, Object> datasources;
public DynamicDataSource() {
datasources = new HashMap<>();
super.setTargetDataSources(datasources);
}
public <T extends DataSource> void addDataSource(DataSourceKey key, T data) {
datasources.put(key, data);
}
protected Object determineCurrentLookupKey() {
return DataSourceHolder.getDataSourceKey();
}
}
复制代码
通过META-INF/spring.factories配置自动装配
spring.factories:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.wx.spider.db.DataSourceAutoConfig
复制代码
DataSourceAutoConfig:
package com.wx.spider.db;
import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure;
import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceBuilder;
import com.baomidou.mybatisplus.autoconfigure.MybatisPlusAutoConfiguration;
import com.wx.spider.db.aop.DataSourceAOP;
import com.wx.spider.db.constant.DataSourceKey;
import com.wx.spider.db.uitl.DynamicDataSource;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Primary;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import javax.sql.DataSource;
/**
* @author:feng
* @create:2021-04-16 15:54
*/
@Configuration
@Import(DataSourceAOP.class)
@AutoConfigureBefore(value={DruidDataSourceAutoConfigure.class, MybatisPlusAutoConfiguration.class})
@ConditionalOnProperty(name = {"spring.datasource.dynamic.enable"}, matchIfMissing = false, havingValue = "true")
public class DataSourceAutoConfig {
// 创建数据源
// 所有引入db-core的模块都需要一个核心库
@Bean
@ConfigurationProperties(prefix = "spring.datasource.druid.core")
public DataSource dataSourceCore(){
return DruidDataSourceBuilder.create().build();
}
// 第二个数据源
@Bean
@ConfigurationProperties(prefix = "spring.datasource.druid.sec")
public DataSource dataSourceSec(){
return DruidDataSourceBuilder.create().build();
}
@Primary
@Bean // 只需要纳入动态数据源到spring容器
public DataSource dataSource() {
System.out.println("--------------------------------动态数据源-------------------------------------");
DynamicDataSource dataSource = new DynamicDataSource();
DataSource coreDataSource = dataSourceCore() ;
DataSource logDataSource = dataSourceSec();
dataSource.addDataSource(DataSourceKey.core, coreDataSource);
dataSource.addDataSource(DataSourceKey.sec, logDataSource);
dataSource.setDefaultTargetDataSource(coreDataSource);
return dataSource;
}
@Bean // 将数据源纳入spring事物管理
public DataSourceTransactionManager transactionManager(@Qualifier("dataSource") DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
}
复制代码
注解解释:
- 将切面导入(@Import(DataSourceAOP.class));
- 在加载自带数据源之前进行装配@AutoConfigureBefore(value={DruidDataSourceAutoConfigure.class, MybatisPlusAutoConfiguration.class});
- 可灵活配置是否启用次加载(@ConditionalOnProperty(name = {“spring.datasource.dynamic.enable”}, matchIfMissing = false, havingValue = “true”))。
3、redis-ws-starter
此模块是对spring-boot-starter-data-redis的二次封装,提供便捷的Redis工具类和Redisson工具类,同时还可使用spring-boot-starter-data-redis提供的操作类StringRedisTemplate和RedisTemplate,兼容单机版、集群版和哨兵版的配置。
核心类:
package com.wx.spider.redis;
import com.wx.spider.redis.serizlizer.RedisObjectSerializer;
import com.wx.spider.redis.util.RedisUtil;
import io.lettuce.core.RedisClient;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.boot.autoconfigure.data.redis.RedisProperties.Sentinel;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.Resource;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.cache.RedisCacheWriter;
import org.springframework.data.redis.connection.*;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.util.ReflectionUtils;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.*;
/**
* @author:feng
* @create:2021-04-14 16:29
*/
@Configuration
@EnableCaching
@SuppressWarnings("all")
@AutoConfigureBefore(RedisTemplate.class)
@EnableConfigurationProperties(RedissonProperties.class)
public class RedisAutoConfig {
@Autowired(required = false)
private RedissonProperties redissonProperties;
@Autowired
private RedisProperties redisProperties;
@Autowired
private ApplicationContext ctx;
@Bean(destroyMethod = "destroy")
@ConditionalOnClass(RedisClient.class)
public LettuceConnectionFactory lettuceConnectionFactory(GenericObjectPoolConfig genericObjectPoolConfig) {
Method clusterMethod = ReflectionUtils.findMethod(RedisProperties.class, "getCluster");
Method timeoutMethod = ReflectionUtils.findMethod(RedisProperties.class, "getTimeout");
Object timeoutValue = ReflectionUtils.invokeMethod(timeoutMethod, redisProperties);
RedisConfiguration redisConfiguration = null;
LettuceClientConfiguration clientConfig = null;
if (redisProperties.getSentinel() != null) {
// 哨兵配置
Method nodesMethod = ReflectionUtils.findMethod(Sentinel.class, "getNodes");
Object nodesValue = ReflectionUtils.invokeMethod(nodesMethod, redisProperties.getSentinel());
String[] nodes = null;
Set<String> sentinelHostAndPorts = new HashSet<>();
if (nodesValue instanceof String) {
nodes = convert(Arrays.asList(((String) nodesValue).split(",")));
sentinelHostAndPorts.addAll(Arrays.asList(((String) nodesValue).split(",")));
} else {
nodes = convert((List<String>) nodesValue);
sentinelHostAndPorts.addAll((List<String>) nodesValue);
}
redisConfiguration = new RedisSentinelConfiguration(redisProperties.getSentinel().getMaster(),
sentinelHostAndPorts);
((RedisSentinelConfiguration) redisConfiguration)
.setPassword(RedisPassword.of(redisProperties.getPassword()));
((RedisSentinelConfiguration) redisConfiguration).setDatabase(redisProperties.getDatabase());
clientConfig = LettucePoolingClientConfiguration.builder().commandTimeout(redisProperties.getTimeout())
.poolConfig(genericObjectPoolConfig).build();
} else if (clusterMethod != null && ReflectionUtils.invokeMethod(clusterMethod, redisProperties) != null) {
// 集群配置
List<String> clusterNodes = redisProperties.getCluster().getNodes();
Set<RedisNode> nodes = new HashSet<RedisNode>();
clusterNodes.forEach(address -> nodes
.add(new RedisNode(address.split(":")[0].trim(), Integer.valueOf(address.split(":")[1]))));
redisConfiguration = new RedisClusterConfiguration();
((RedisClusterConfiguration) redisConfiguration).setClusterNodes(nodes);
((RedisClusterConfiguration) redisConfiguration)
.setPassword(RedisPassword.of(redisProperties.getPassword()));
/**
* ClusterTopologyRefreshOptions配置用于开启自适应刷新和定时刷新。如自适应刷新不开启,
* Redis集群变更时将会导致连接异常!
*/
ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
// 开启自适应刷新
.enableAdaptiveRefreshTrigger(ClusterTopologyRefreshOptions.RefreshTrigger.MOVED_REDIRECT,
ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS)
// 开启所有自适应刷新,MOVED,ASK,PERSISTENT都会触发
// .enableAllAdaptiveRefreshTriggers()
// 自适应刷新超时时间(默认30秒)
.adaptiveRefreshTriggersTimeout(Duration.ofSeconds(25)) // 默认关闭开启后时间为30秒
// 开周期刷新
.enablePeriodicRefresh(Duration.ofSeconds(20)) // 默认关闭开启后时间为60秒
// ClusterTopologyRefreshOptions.DEFAULT_REFRESH_PERIOD
// 60 .enablePeriodicRefresh(Duration.ofSeconds(2)) =
// .enablePeriodicRefresh().refreshPeriod(Duration.ofSeconds(2))
.build();
clientConfig = LettucePoolingClientConfiguration.builder().commandTimeout(redisProperties.getTimeout())
.poolConfig(genericObjectPoolConfig)
.clientOptions(
ClusterClientOptions.builder().topologyRefreshOptions(topologyRefreshOptions).build())
// 将appID传入连接,方便Redis监控中查看
// .clientName(appName + "_lettuce")
.build();
} else {
// 单机版配置
redisConfiguration = new RedisStandaloneConfiguration();
((RedisStandaloneConfiguration) redisConfiguration).setDatabase(redisProperties.getDatabase());
((RedisStandaloneConfiguration) redisConfiguration).setHostName(redisProperties.getHost());
((RedisStandaloneConfiguration) redisConfiguration).setPort(redisProperties.getPort());
((RedisStandaloneConfiguration) redisConfiguration)
.setPassword(RedisPassword.of(redisProperties.getPassword()));
clientConfig = LettucePoolingClientConfiguration.builder().commandTimeout(redisProperties.getTimeout())
.poolConfig(genericObjectPoolConfig).build();
}
if (redisProperties.isSsl()) {
clientConfig.isUseSsl();
}
LettuceConnectionFactory factory = new LettuceConnectionFactory(redisConfiguration, clientConfig);
return factory;
}
/**
* GenericObjectPoolConfig 连接池配置
*/
@Bean
public GenericObjectPoolConfig genericObjectPoolConfig() {
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
poolConfig.setMaxIdle(redisProperties.getLettuce().getPool().getMaxIdle());
poolConfig.setMinIdle(redisProperties.getLettuce().getPool().getMinIdle());
poolConfig.setMaxTotal(redisProperties.getLettuce().getPool().getMaxActive());
poolConfig.setMaxWaitMillis(redisProperties.getLettuce().getPool().getMaxWait().getSeconds());
Duration timeOut = redisProperties.getTimeout();
Duration shutdownTimeout = redisProperties.getLettuce().getShutdownTimeout();
return poolConfig;
}
@Bean
public CacheManager cacheManager(LettuceConnectionFactory lettuceConnectionFactory ) {
RedisCacheConfiguration redisCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig();
redisCacheConfiguration = redisCacheConfiguration.entryTtl(Duration.ofMinutes(30L)) // 设置缓存的默认超时时间:30分钟
.disableCachingNullValues() // 如果是空值,不缓存
.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(RedisSerializer.string())) // 设置key序列化器
.serializeValuesWith(
RedisSerializationContext.SerializationPair.fromSerializer(RedisSerializer.java())); // 设置value序列化器
return RedisCacheManager.builder(RedisCacheWriter.nonLockingRedisCacheWriter(lettuceConnectionFactory))
.cacheDefaults(redisCacheConfiguration).build();
}
/**
* 适配redis cluster单节点
*/
// @Primary
// @Bean("redisTemplate")
// 没有此属性就不会装配bean 如果是单个redis 将此注解注释掉
// @ConditionalOnProperty(name = "spring.redis.cluster.nodes", matchIfMissing = false)
public RedisTemplate<String, Object> getRedisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>();
redisTemplate.setConnectionFactory(lettuceConnectionFactory);
RedisSerializer stringSerializer = new StringRedisSerializer();
// RedisSerializer redisObjectSerializer = new RedisObjectSerializer();
RedisSerializer redisObjectSerializer = new RedisObjectSerializer();
redisTemplate.setKeySerializer(stringSerializer); // key的序列化类型
redisTemplate.setHashKeySerializer(stringSerializer);
redisTemplate.setValueSerializer(redisObjectSerializer); // value的序列化类型
redisTemplate.setHashValueSerializer(redisObjectSerializer); // value的序列化类型
redisTemplate.afterPropertiesSet();
redisTemplate.opsForValue().set("hello", "wolrd");
return redisTemplate;
}
/**
* 适配redis单节点
*/
@Primary
@Bean("redisTemplate")
@ConditionalOnProperty(name = "spring.redis.host", matchIfMissing = true)
public RedisTemplate<String, Object> getSingleRedisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>();
RedisSerializer redisObjectSerializer = new RedisObjectSerializer();
redisTemplate.setConnectionFactory(lettuceConnectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer()); // key的序列化类型
redisTemplate.setValueSerializer(redisObjectSerializer); // value的序列化类型
redisTemplate.setHashValueSerializer(redisObjectSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
@Bean
public HashOperations<String, String, String> hashOperations(StringRedisTemplate stringRedisTemplate) {
return stringRedisTemplate.opsForHash();
}
/**
* redis工具类
*/
@Bean("redisUtil")
public RedisUtil redisUtil(LettuceConnectionFactory lettuceConnectionFactory,
StringRedisTemplate stringRedisTemplate, HashOperations<String, String, String> hashOperations) {
RedisUtil redisUtil = new RedisUtil(lettuceConnectionFactory, stringRedisTemplate, hashOperations);
return redisUtil;
}
@Bean(destroyMethod = "shutdown")
@ConditionalOnProperty(name = "spring.redis.redisson.enable", matchIfMissing = false, havingValue = "true")
@ConditionalOnMissingBean(RedissonClient.class)
public RedissonClient redissonClient() throws IOException {
Config config = null;
Method clusterMethod = ReflectionUtils.findMethod(RedisProperties.class, "getCluster");
Method timeoutMethod = ReflectionUtils.findMethod(RedisProperties.class, "getTimeout");
Object timeoutValue = ReflectionUtils.invokeMethod(timeoutMethod, redisProperties);
int timeout;
if (null == timeoutValue) {
timeout = 60000;
} else if (!(timeoutValue instanceof Integer)) {
Method millisMethod = ReflectionUtils.findMethod(timeoutValue.getClass(), "toMillis");
timeout = ((Long) ReflectionUtils.invokeMethod(millisMethod, timeoutValue)).intValue();
} else {
timeout = (Integer) timeoutValue;
}
// spring.redis.redisson.config=classpath:redisson.yaml
if (redissonProperties.getConfig() != null) {
try {
InputStream is = getConfigStream();
config = Config.fromJSON(is);
} catch (IOException e) {
// trying next format
try {
InputStream is = getConfigStream();
config = Config.fromYAML(is);
} catch (IOException ioe) {
throw new IllegalArgumentException("Can't parse config", ioe);
}
}
} else if (redisProperties.getSentinel() != null) {
// 哨兵配置
Method nodesMethod = ReflectionUtils.findMethod(Sentinel.class, "getNodes");
Object nodesValue = ReflectionUtils.invokeMethod(nodesMethod, redisProperties.getSentinel());
String[] nodes;
if (nodesValue instanceof String) {
nodes = convert(Arrays.asList(((String) nodesValue).split(",")));
} else {
nodes = convert((List<String>) nodesValue);
}
config = new Config();
config.useSentinelServers().setMasterName(redisProperties.getSentinel().getMaster())
.addSentinelAddress(nodes).setDatabase(redisProperties.getDatabase()).setConnectTimeout(timeout)
.setPassword(redisProperties.getPassword());
} else if (clusterMethod != null && ReflectionUtils.invokeMethod(clusterMethod, redisProperties) != null) {
// 集群配置
Object clusterObject = ReflectionUtils.invokeMethod(clusterMethod, redisProperties);
Method nodesMethod = ReflectionUtils.findMethod(clusterObject.getClass(), "getNodes");
List<String> nodesObject = (List) ReflectionUtils.invokeMethod(nodesMethod, clusterObject);
String[] nodes = convert(nodesObject);
config = new Config();
config.useClusterServers().addNodeAddress(nodes).setConnectTimeout(timeout)
.setPassword(redisProperties.getPassword());
} else {
// 单机redssion默认配置
config = new Config();
String prefix = "redis://";
Method method = ReflectionUtils.findMethod(RedisProperties.class, "isSsl");
if (method != null && (Boolean) ReflectionUtils.invokeMethod(method, redisProperties)) {
prefix = "rediss://";
}
config.useSingleServer().setAddress(prefix + redisProperties.getHost() + ":" + redisProperties.getPort())
.setConnectTimeout(timeout).setDatabase(redisProperties.getDatabase())
.setPassword(redisProperties.getPassword());
}
return Redisson.create(config);
}
private String[] convert(List<String> nodesObject) {
List<String> nodes = new ArrayList<String>(nodesObject.size());
for (String node : nodesObject) {
if (!node.startsWith("redis://") && !node.startsWith("rediss://")) {
nodes.add("redis://" + node);
} else {
nodes.add(node);
}
}
return nodes.toArray(new String[nodes.size()]);
}
private InputStream getConfigStream() throws IOException {
Resource resource = ctx.getResource(redissonProperties.getConfig());
InputStream is = resource.getInputStream();
return is;
}
}
复制代码
4、rocketmq-ws-starter
此模块是对rocketmq-spring-boot-starter的二次封装,由于自带的封装的消费者没有带获取消费重试次数功能,于是自己做了封装,因为爬虫时会有很多不定性因素导致失败,以便于屡次消费失败后记录错误日志。
核心代码:
通过自定义注解配置消费者模式和主题、线程数量等。
package com.wx.spider.rocketmq.annotation;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.lang.annotation.*;
/**
* @author:feng
* @create:2021-04-21 15:17
*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface WsRocketMQConsumer {
String consumerGroup() default "";
String topic();
String selectorExpression() default "*";
int consumeThreadMax() default 32;
int consumeThreadMin() default 8;
int consumeMessageBatchMaxSize() default 1;
String nameServer() default "";
MessageModel messageModel() default MessageModel.CLUSTERING;
}
复制代码
通过继承抽象类来创建消费者监听,自己可加入业务逻辑并且可以通过MessageExt获取消费次数。
package com.wx.spider.rocketmq.consumer;
import com.wx.spider.rocketmq.annotation.WsRocketMQConsumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.springframework.beans.factory.annotation.Value;
import javax.annotation.PostConstruct;
/**
* @author:feng
* @create:2021-04-21 12:21
* 消费者抽象类
*/
@Slf4j
public abstract class BaseRocketMQConsumer {
@Value("${rocketmq.name-server}")
private String namesrvAddr;
@Value("${rocketmq.consumer.group}")
private String groupName;
@PostConstruct
public void init() {
WsRocketMQConsumer annotation = this.getClass().getAnnotation(WsRocketMQConsumer.class);
String nameServer = annotation.nameServer();
if(nameServer != null && !"".equals(nameServer)){
namesrvAddr = nameServer;
}
String consumerGroup = annotation.consumerGroup();
if(consumerGroup != null && !"".equals(consumerGroup)){
groupName = consumerGroup;
}
String topicStr = annotation.topic();
String selectorExpression = annotation.selectorExpression();
int consumeThreadMin = annotation.consumeThreadMin();
int consumeThreadMax = annotation.consumeThreadMax();
MessageModel messageModel = annotation.messageModel();
int consumeMessageBatchMaxSize = annotation.consumeMessageBatchMaxSize();
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeThreadMin(consumeThreadMin);
consumer.setConsumeThreadMax(consumeThreadMax);
consumer.setMessageModel(messageModel);
try {
consumer.subscribe(topicStr, selectorExpression);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeOrderlyContext) -> {
try {
for (MessageExt messageExt : list) {
String tag = messageExt.getTags();
String topic = messageExt.getTopic();
return consume(tag, topic, messageExt);
}
} catch (Exception e) {
e.printStackTrace();
//稍后再试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
//消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
consumer.start();
System.out.println("消费者启动完成");
} catch (Exception e) {
e.printStackTrace();
}
}
protected abstract ConsumeConcurrentlyStatus consume(String tag, String topic, MessageExt messageExt);
}
复制代码
总结
本篇文章对模块之间的关系和功能模块(starter)的封装做了解释说明,项目的功能结构大概已经明了,之后的文章将进入业务逻辑篇,介绍是如何通过抓包和js注入获取自动获取文章信息的。