Nacos 是阿里巴巴开源的集分布式配置中心、分布式注册中心为一体的分布式解决方案。
它的优点:
- 提供命令空间,方便管理不同环境的配置;
- 提供web界面,方便管理配置和服务;
- 支持配置版本管理,回滚;
- 支持服务管理,手动上线、下线服务。
等等其他优点。
1 如何使用 Nacos 自动更新配置
1.1 配置自动更新的两种方式
第一种方式
-
属性使用
@Value
注解 -
类使用
@RefreshScope
注解
@RefreshScope
@RequestMapping("config")
public class ConfigController {
@Value("${useLocalCache:false}")
private boolean useLocalCache;
}
复制代码
第二种方式
- 使用
@NacosValue
注解,自动更新配置成true
@Controller
@RequestMapping("config")
public class ConfigController {
@NacosValue(value = "${useLocalCache:false}", autoRefreshed = true)
private boolean useLocalCache;
@RequestMapping(value = "/get", method = GET)
@ResponseBody
public boolean get() {
return useLocalCache;
}
}
复制代码
2 Nacos 配置更新源码分析
先上图
具体步骤
1、第一步(更新数据库)比较简单,就是判断是新增配置还是修改配置,然后修改或者新增数据库响应信息(根据使用的数据库是内嵌derby还是mysql使用不同的实现)。
2、比较复杂的是通知(更新文件系统中的文件)
2.1 通过发布订阅模式,发布配置变更事件
2.2 订阅者接收到消息后,调用controller请求(communication/dataChange)
2.3 这个controller请求,启动一个异步任务,这个异步任务更新配置数据到指定的配置文件(nacos目录下)
之后通过发布-订阅模式通知其他服务,其他服务接收到通知之后,更新配置。
PS:
1、controller 请求http://ip:port/nacos/v1/cs/communication/dataChange?dataId=example&group=DEFAULT_GROUP
2、配置存储有两个地方,一个是在文件系统中,另一个是配置的数据库中(可能是内嵌的derby数据库和MySQL数据库)
3、一些配置基本信息,比如配置文件的 MD5 值、dataId、groupName等,会保存在 ConcurrentHashMap 存储的缓存的
源码1: 添加异步任务更新配置文件代码
/**
* Add DumpTask to TaskManager, it will execute asynchronously.
*/
public void dump(String dataId, String group, String tenant, long lastModified, String handleIp, boolean isBeta) {
String groupKey = GroupKey2.getKey(dataId, group, tenant);
String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta));
dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, lastModified, handleIp, isBeta));
DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey);
}
复制代码
然后通过persistService
读取数据库中的相应记录
源码2: 读取数据库中的最新配置数据
ConfigInfo cf = persistService.findConfigInfo(dataId, group, tenant);
复制代码
将读取到的内容,写入到本地文件nacos/distribution/target/nacos-server-1.4.2/nacos/data/config-data/${GROUP_NAME}/${dataId}
中,并更新配置文件的md5值。
源码3: 保存配置文件到文件目录中,并更新文件的 MD5 值
/**
* Save config file and update md5 value in cache.
*
* @param dataId dataId string value.
* @param group group string value.
* @param tenant tenant string value.
* @param content content string value.
* @param lastModifiedTs lastModifiedTs.
* @param type file type.
* @return dumpChange success or not.
*/
public static boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs,
String type) {
String groupKey = GroupKey2.getKey(dataId, group, tenant);
CacheItem ci = makeSure(groupKey);
ci.setType(type);
final int lockResult = tryWriteLock(groupKey);
assert (lockResult != 0);
if (lockResult < 0) {
DUMP_LOG.warn("[dump-error] write lock failed. {}", groupKey);
return false;
}
try {
final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
if (md5.equals(ConfigCacheService.getContentMd5(groupKey))) {
DUMP_LOG.warn("[dump-ignore] ignore to save cache file. groupKey={}, md5={}, lastModifiedOld={}, "
+ "lastModifiedNew={}", groupKey, md5, ConfigCacheService.getLastModifiedTs(groupKey),
lastModifiedTs);
} else if (!PropertyUtil.isDirectRead()) {
// 保存数据到文件中
DiskUtil.saveToDisk(dataId, group, tenant, content);
}
updateMd5(groupKey, md5, lastModifiedTs);
return true;
} catch (IOException ioe) {
DUMP_LOG.error("[dump-exception] save disk error. " + groupKey + ", " + ioe.toString(), ioe);
if (ioe.getMessage() != null) {
String errMsg = ioe.getMessage();
if (NO_SPACE_CN.equals(errMsg) || NO_SPACE_EN.equals(errMsg) || errMsg.contains(DISK_QUATA_CN) || errMsg
.contains(DISK_QUATA_EN)) {
// Protect from disk full.
FATAL_LOG.error("磁盘满自杀退出", ioe);
System.exit(0);
}
}
return false;
} finally {
releaseWriteLock(groupKey);
}
}
/**
* Save configuration information to disk.
*/
public static void saveToDisk(String dataId, String group, String tenant, String content) throws IOException {
File targetFile = targetFile(dataId, group, tenant);
FileUtils.writeStringToFile(targetFile, content, Constants.ENCODE);
}
复制代码
3 源码亮点
3.1 发布订阅模式
发布订阅模式使用于一个事件发生需要通知多个订阅者的场景。实际开发中,也能会有不同的名字,大家要能够识别,比如:Subject-Observer(经典例子)、Publisher-Subscriber(Nacos)、Producer-Consumer(Kafka)、Dispatcher-Listener。
发布变更事件
/**
* Request publisher publish event Publishers load lazily, calling publisher.
*
* @param eventType class Instances type of the event type.
* @param event event instance.
*/
private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
return INSTANCE.sharePublisher.publish(event);
}
final String topic = ClassUtils.getCanonicalName(eventType);
EventPublisher publisher = INSTANCE.publisherMap.get(topic);
if (publisher != null) {
return publisher.publish(event);
}
LOGGER.warn("There are no [{}] publishers for this event, please register", topic);
return false;
}
复制代码
循环订阅者列表,通知订阅者。
/**
* Receive and notifySubscriber to process the event.
*
* @param event {@link Event}.
*/
void receiveEvent(Event event) {
final long currentEventSequence = event.sequence();
// Notification single event listener
for (Subscriber subscriber : subscribers) {
// Whether to ignore expiration events
if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",
event.getClass());
continue;
}
// Because unifying smartSubscriber and subscriber, so here need to think of compatibility.
// Remove original judge part of codes.
notifySubscriber(subscriber, event);
}
}
复制代码
通过线程池处理订阅者任务,因为这里订阅者需要调用HTTP请求来处理更新,所以通过线程池可以解耦。
@Override
public void notifySubscriber(final Subscriber subscriber, final Event event) {
LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);
final Runnable job = new Runnable() {
@Override
public void run() {
subscriber.onEvent(event);
}
};
final Executor executor = subscriber.executor();
if (executor != null) {
executor.execute(job);
} else {
try {
job.run();
} catch (Throwable e) {
LOGGER.error("Event callback exception : {}", e);
}
}
}
复制代码
3.2 任务管理器
这里存在竞争资源—单个配置文件,也就是确定Group下的dataId文件或者数据库记录。
- 失败重试
/**
* process tasks in execute engine.
*/
protected void processTasks() {
Collection<Object> keys = getAllTaskKeys();
for (Object taskKey : keys) {
AbstractDelayTask task = removeTask(taskKey);
if (null == task) {
continue;
}
NacosTaskProcessor processor = getProcessor(taskKey);
if (null == processor) {
getEngineLog().error("processor not found for task, so discarded. " + task);
continue;
}
try {
// ReAdd task if process failed
if (!processor.process(task)) {
retryFailedTask(taskKey, task);
}
} catch (Throwable e) {
getEngineLog().error("Nacos task execute error : " + e.toString(), e);
retryFailedTask(taskKey, task);
}
}
}
复制代码
3.3. 添加任务处理并发
使用重入锁ReentrantLock
protected final ReentrantLock lock = new ReentrantLock();
@Override
public void addTask(Object key, AbstractDelayTask newTask) {
lock.lock();
try {
AbstractDelayTask existTask = tasks.get(key);
if (null != existTask) {
newTask.merge(existTask);
}
tasks.put(key, newTask);
} finally {
lock.unlock();
}
}
复制代码
- 缓存任务处理器对象,需要的时候直接通过本地缓存获取
private final ConcurrentHashMap<Object, NacosTaskProcessor> taskProcessors = new ConcurrentHashMap<Object, NacosTaskProcessor>();
复制代码
3.4 一个接口多个实现,根据条件选择
如果要实现多种不同数据库的操作的时候,Condition
是非常有用的,比如这里要支持内嵌的derby数据库,也要支持MySQL,通过Condition可以根据需要使用不同的数据库实现。
传统企业里面有些业务系统需要支持多种不同的数据库,不同客户现场可能会使用不同的数据库,通过这种方式可以减少定制、减少到现场由于客户数据库不同而临时进行定制开发。
@Conditional(value = ConditionOnEmbeddedStorage.class)
@Component
public class EmbeddedStoragePersistServiceImpl implements PersistService {
复制代码
@Conditional(value = ConditionOnExternalStorage.class)
@Component
public class ExternalStoragePersistServiceImpl implements PersistService {
复制代码
如果本文对你有帮助,欢迎关注我的公众号 【哥妞】 ,带你深入 JAVA 的世界~