王鹏,2017 年至今加入去哪儿机票事业部服务平台,主要负责机票后服务相关业务,以及服务平台数据同步服务和基础索引服务的重构升级。
背景
在互联网时代,每天都会产生海量的数据,去哪儿网成立至今,积累了海量的用户出行数据,每一次旅行机票记录,酒店记录,门票记录等等,这些数据都存储到数据库中,现在流行的 Mysql 数据库,一般情况下单表容量在 1kw 以下是最佳状态,如果将所有的历史记录都存储到 DB 中,那么每张表的大小将超过 10 亿级,根据不同维度的查询将是一个梦魇,比如通过订单号查询,根据用户加状态查询等等,数据库将不堪重负,慢查询可能导致整个库不可提供服务。常用的解决方案是把这些数据存储到一个备用的数据库或者异构的数据结构中,来提升我们的查询效率。数据同步系统就是做这件事的,将数据从一个数据源导入另外一个数据源,提供同构或者异构、低延迟的的数据的同步,提供高可用的数据同步服务,并且保证数据最终一致性。
数据同步的开源方案
1.Databus
Databus 是一个低延迟、可靠的、支持事务的、保持一致性的数据同步系统。Databus 通过挖掘数据库日志的方式,将数据库变更实时、可靠的从数据库拉取出来,业务可以通过定制化 client 实时获取变更并进行其他业务逻辑。
几个特点:
- 来源独立:Databus 支持多种数据来源的变更抓取,包括 Oracle 和 MySQL。
- 可扩展、高度可用:Databus 能扩展到支持数千消费者和事务数据来源,同时保持高度可用性。
- 事务按序提交:Databus 能保持来源数据库中的事务完整性,并按照事务分组和来源的提交顺序交付变更事件。
- 低延迟、支持多种订阅机制:数据源变更完成后,Databus 能在毫秒级内将事务提交给消费者。同时,消费者使用 Databus 中的服务器端过滤功能,可以只获取自己需要的特定数据。
- 无限回溯:对消费者支持无限回溯能力,例如当消费者需要产生数据的完整拷贝时,它不会对数据库产生任何额外负担。当消费者的数据大大落后于来源数据库时,也可以使用该功能。
2.Canal
Canal 是阿里巴巴的一个开源项目,基于 java 实现,通过模拟成为 Mysql 的 slave 的方式,监听 Mysql 的 binlog 日志来获取数据,binlog 设置为 row 模式以后,不仅能获取到执行的每一个增删改的脚本,同时还能获取到修改前和修改后的数据,基于这个特性,canal 就能高性能的获取到 Mysql 数据数据的变更。
几个特点 :
- 来源只支持 Mysql。
- 扩展性,Canal 可以将数据发送到 Kafka,或者 MQ 中这样可以由开发者自定义如何分配数据。
- 低延迟,ms 级别的延迟同步,满足接近实时系统的同步需求。
- 事务性,支持通过不同方式解析和订阅 MySQL binlog,例如通过 GTID。
- 支持回溯,取决于 binglog 存储的时间,可以反查位点进行重新同步。
对比 | Canal | Databus |
---|---|---|
支持来源 | Mysql | Mysql Oracle |
扩展性 | 发消息通用消息接口 | 多消费者支持 |
延迟 | 低 | 低 |
事务性 | 支持 | 支持 |
支持回溯 | 支持调整位点 | 支持 |
复杂度 | 相对简单 | 复杂 |
通过对比可以发现,Canal 和 Databus 在主体功能方面并没有太大区别,因为在当前的环境下不存在 Oracle 数据库同步的问题,所以我们选型选择了 Canal 作为数据同步的一个技术解决方案,另外相对于 Databus 的配置和部署实施 Canal 也是相对简单的。
老系统的历史问题
老系统的问题很多,如何重构?
核心流程:
-
DB 产生的 Binlog 通过 Canal 传输到 Databus(内部自研的,与开源同名)中,Databus 又分为生产者和消费者两个大的模块。Binlog 的接受者即 Databus 的生产者,这个模块的主要作用是解析 Binlog 数据并且转化为一个通用的数据结构 DataRow。当然Databus也可以接受其他类型的数据源,比如公司的 QMQ、Kafka 等数据源。
-
转化的 DataRow 通过 Kafka 发送到消费者,消费者模块主要是进行聚合操作,这些数据传递给异步的数据存储系统,比如 ES、Hive 等等。
-
存储系统现在选择的是 ES 作为最终的存储介质,ES 本身具有分布式,高可用,存储海量数据并且提供快速查询。并且提供主备两个集群支持高可用和负载均衡。
-
整体的配置以及调度由 ZK 完成,包括 Canal 的主备,Databus 的生产者和消费者的上下线,以及索引的元数据管理都是在 ZK 中配置管理。
-
查询服务,Qgalaxy 这个系统主要是提供了业务线查询的 API,统一的管理业务线的查询,每个业务线或者应用接入访问 ES 集群都需要通过这个服务,统一的进行鉴权,流量的切换以及限流。
通过架构的简化图
-
Databus 的生产者是单点的,非高可用的。这是非常严重的问题,如果存在某台机器宕机的情况,则某些索引的同步服务将暂停,导致故障。消费者端写入到某些 Kafka 或者 ES 的数据都是写死的,没有做成配置的,每次新增索引都要进行开发。切换不同的测试环境也十分麻烦。
-
Canal 的版本过低,老系统使用的是 1.0.19,开源版本已经更新到 1.0.25,存在一些已知的 Bug,并且配置都在一个文档里面记录了,同步那些数据库的地址,实例以及用户名密码等信息,运维十分的困难,必须知道每个索引对应的库才能找到对应的机器操作运维。
-
所有的配置文件都存储在了 ZK 中,比如 Databus 的生产者消费者的初始化数据,连接那些 Kafka 的 Topic,接受那些消息,写入到那些索引中这些配置信息如果 ZK 不可用的情况下,是无法启动某个生产者或者消费者造成故障。
-
全量同步的问题,Databus 的全量同步需要硬编码去实现,没有做到灵活的根据库来配置。
-
查询系统的问题,Qgalaxy 的代码耦合了非常多的业务层的逻辑在里面,导致代码十分臃肿,另外存在通过 Jar 包方式直连 ES 的代码,这让查询层形同虚设,不利于整体集群的管理和限流。
-
存储层部署混乱,重要索引和其他索引混合部署,如果其他的不重要索引出现问题,核心索引会受到比较大影响。核心索引数据量巨大,不合理的分片导致查询效率缓慢。索引的版本过低内存管理不好,导致频繁的 GC。
针对于以上问题提出来重构的整体架构如下:
- 针对 Canal 的问题,在新的架构里引入了 Otter。
Otter 是基于 Canal 开源产品,获取数据库增量日志数据的一套系统。Otter 将整个数据传输过程抽象为四个阶段:
数据 on-Fly,尽可能不落地,更快的进行数据同步。(开启 node loadBalancer 算法,如果 Node 节点 S+ETL 落在不同的 Node 上,数据会有个网络传输过程)
- Otter 的引入解决了 Databus 生产者模块单点的问题,多个 Node 节点可以做到负载均衡和失败转移。同时也解决了,Canal 的配置分布到不同的文件,不便于统一管理的问题,Otter 的控制台集合了对于所有数据源的管理、同步关系的管理(Canal 的配置管理,数据库的配置管理)、主从的管理(Canal 的主从,DB 同步的主从管理)、映射表的管理等。
- 针对于问题 3,引入 Otter 可以把部分 Databus 在生产者端的配置移动到了 Otter 的配置中,这里需要对于 Otter 做一些改造,比如:唯一 id:Rowkey 的生成,以及配置发送到那个消息主题和分组。
- 针对问题 4,查询的问题,主要采用重新写了一个 ES 的通用网关 ES Getway,这个网关和 ES 的版本无关,这样对于不同的 ES 版本,只是提供相应的 DSL 语句的传输以及类型的转化,不提供特定的解析,这部分放到客户端的 Jar 去做兼容。
- 针对于问题 5,全量导入的问题,引入了开源的 Datax 框架作为导入的一个通用解决方案。
- 针对于问题 6,ES 集群部署混乱的问题,采用了核心集群和非核心集群的分组,在配置对应集群的元数据的时候就将非核心集群进行分组,部署的时候核心集群和非核心集群的机器隔离。
Otter 的改造
Otter 的整个目录结构分为三个部分:Node 是实际上进行数据同步的工程, Manager 是对 Node 节点进行管理,数据统计,Node 节点之间的协调,基本信息的同步等,Share 主要是公用的部分,接口抽象等。
新数据同步系统的改动点:
- Load 阶段发送消息给 Kafka,这部分的代码改造涉及两个点,唯一 ID 的生成策略以及发送 Kafka 消息。首先要在 DataMediaPair 这个类里面增加最后一列 partitionKey,配置那些数据表的列是要作为 Kafka的PartitionKey,比如订单表的 orderid 这个字段就是主键列,发送消息的时候就要根据这个字段的值作为 PartitionKey 发送,这样的目的是为了保证同一张表的同一个 ID 的数据变更都是在一个 Partition下,消费的时候可以保证顺序性。选择 PartitionKey 的原则一般选择业务的主键,比如每张表都有自己的主键 ID,也有业务层面的唯一 ID,尽量选取业务层面的唯一 ID 作为 PartitionKey,这样可以保证一个业务的顺序性。
public class DataMediaPair
{
private Long id;
private Long pipelineId;// 同步任务id
private DataMedia source;//源
private DataMedia target;// 目标
private Long pullWeight;// 介质A中获取数据的权重
private Long pushWeight;// 介质B中写入数据的权重
private ExtensionData resolverData;// 关联数据解析类
private ExtensionData filterData; // filter解析类
private ColumnPairMode columnPairMode = ColumnPairMode.INCLUDE;
private List<ColumnPair> columnPairs = new ArrayList<ColumnPair>();
private List<ColumnGroup> columnGroups = new ArrayList<ColumnGroup>();
private Date gmtCreate; private Date gmtModified;
private String subEventType; // 定义的事件类型
private Long kafkaDataMediaId;// 比如,还想额外同步数据到其他数据元
private boolean primaryTable;// 是否为主表
private String groovy; // groovy 脚本
private String esIndex;// es index
private String partitionKey; // partition key
}
复制代码
- 在 Load 阶段发送消息
Load 阶段的函数其实就是根据权重来分配处理数据,并且对于相同的 PK 做聚合,降低处理量,目前 Otter 支持的的方式主要是写入另外一个数据库,所以在下边的代码中 doLoad 方法主要是多线程写入另外一个数据库中。而这里不需要写入另外一个数据库中,只需要发送消息即可,改造这个方法即可。
public class DbLoadAction implements InitializingBean, DisposableBean {
/**
* 返回结果为已处理成功的记录
* RowBatch
*/ public DbLoadContext load(RowBatch rowBatch, WeightController controller) {
{
WeightBuckets<EventData> buckets = buildWeightBuckets(context, datas);
List<Long> weights = buckets.weights();
controller.start(weights); // weights可能为空,也得调用start方法
if (CollectionUtils.isEmpty(datas)) {
logger.info("##no eventdata for load");
} // 按权重构建数据对象
// 处理数据
for (int i = 0; i < weights.size(); i++) {
Long weight = weights.get(i);
controller.await(weight.intValue()); // 处理同一个weight下的数据
List<EventData> items = buckets.getItems(weight);
logger.debug("##start load for weight:" + weight); // 预处理下数据 // 进行一次数据合并,合并相同pk的多次I/U/D操作
items = DbLoadMerger.merge(items); // 按I/U/D进行归并处理
DbLoadData loadData = new DbLoadData();
doBefore(items, context, loadData); // 执行load操作
doLoad(context, loadData);
controller.single(weight.intValue());
logger.debug("##end load for weight:" + weight);
}
}
复制代码
这里面的有个很重要的事情是:RowBatch rowBatch 这个对象,这个对象是针对于某个数据库实例的 Binlog 集合,而不是针对某个表或者某个库的 Binlog 集合。这块如果不搞清楚会引发顺序性的问题。比如分库分表的场景下,每个实例可能包含多个库,那么他们同步的 DataMedia 就是不同的,要根据不同的数据内容去获取,而不是获取一个就可以了。
List<EventData> datas = rowBatch.getDatas(); //这里获取ParitionKey的写法就是错误认为是一个数据库的Schma发出的Binlog
复制代码
DataMediaSource source = ConfigHelper.findDataMedia(context.getPipeline(), datas.get(0).getTableId()).getSource();
DataMediaPair dataMediaPair = ConfigHelper.findDataMediaPair(context.getPipeline(), datas.get(0).getPairId());
String partitionKey = dataMediaPair.getPartitionKey();
复制代码
DTS 聚合层
整个的聚合层设计分为两部分:
-
仿照 Otter 的 Manager 和 Node 模式,DTS 系统中也分为 SETL 四个阶段,这四个阶段都在 Node 节点的中执行,Manager 主要是维护 Node 节点的启停,状态的查询。
-
业务聚合的问题,机票订单表由几个子表组成:订单主表,支付信息表,航段信息以及乘客信息表,订单的整体数据是这些子表的数据聚合而成,这些子表的数据可能是一对多也可能是一对一,聚合层的设计采用 XML 配置的方式去配置,定义如下:
<Config>
<!-- 定义索引最外层使用的父表 -->
<parentTable name="order">
<!-- 父表的rowKey定义,数据库列名及其数据类型 -->
<!-- d保证rowKey相同的数据会顺序消费,并写入同一个ES doc-->
<rowKeyConfig column="order_no"columnType="string"/>
<!-- 定义第一级的子表1,需要指明是否是叶子级别的表(默认值是true,不写isLeaf代表就是叶子级别的表), rowType代表当前表和父表的关系,array为1对多,map为1对1-->
<childTablename="sub_order"isLeaf="false"rowType="array">
<!-- 第一级的子表的rowKey -->
<rowKeyConfigcolumn="order_no"columnType="string"/>
<!-- 定义第二级 -->
<childTablename="product"rowType="map"tinyint1="male,is_valid">
<!-- 第二级的子表的rowKey -->
<rowKeyConfigcolumn="product_no"parentColumn="sub_order_no"columnType="string"dependent="true"/>
</childTable>
</childTable>
<!-- 定义第一级的子表2 -->
<childTablename="ext_info"rowType="map">
<!-- 第一级的子表的rowKey -->
<rowKeyConfigcolumn="order_no"columnType="string"/>
</childTable>
</parentTable>
</CommonConfig>
复制代码
一个父表和多个子表的XML配置映射关系,核心就是配置关联的外键,父表 order ,子表为 suborder,子表 rowkey :orderno 这个字段就是子表和父表关联的字段,通过这个字段可以找到这俩表的映射关系。如何设计一套程序结构将这些映射关系进行存储和使用呢?
表关系抽象接口
//一个表关系的整体抽象接口
public interface TableDataRow {
//获取Table的名字
String getTableName();
//array 还是 map 一对多还是一对一
RowType getType();
//获取关联键的查询SQL
String getSql();
//子表的list
List<ChildTableDataRow> getChildren();
//关联的外键
RowKey getRowKey();
}
复制代码
主表子表的实现
public class RootTableDataRow implements TableDataRow
{
//表名
private String tableName;
//类型
private RowType type;
// select * from private String sql;
//子表的集合
private List<ChildTableDataRow> children;
//关联外键
private RowKey rowKey;
//分库分表 可能对应多个名字
private Set<String> dbNames;
}
public class ChildTableDataRow implements TableDataRow
{
private String tableName;
private RowType type;
private String sql;
//查询的where 条件 比如 id=10
private WhereEntry whereEntry;
private List<ChildTableDataRow> children;
//主表的关联关系
private TableDataRow parent;
private RowKey rowKey;
}
复制代码
最核心的代码就是 Rowkey 的生成逻辑,一个结构的所有子表的 Rowkey 必须和主表是一致的,那么就需要一个递归的算法来实现这种复杂的 Rowkey 生成。
/**
* 获取rowKey $ 之后的字段内容
*
* @param tableDataRow
* @param thisTableRowKeyValue
* @param schemaName
* @return */
public String getRootRowKeyValue(TableDataRow tableDataRow, String thisTableRowKeyValue, String schemaName, CollectConfig collectConfig) {
// 没有父类,则直接返回rowKey的value
if (!tableDataRow.getRowKey().isDepend()) {
return thisTableRowKeyValue;
}
// 如果是最上一级,直接返回
if (tableDataRow instanceof RootTableDataRow) {
return thisTableRowKeyValue;
}
// 是child节点,查找其父的rowKey的value
ChildTableDataRow child = (ChildTableDataRow) tableDataRow;
String parentRowKeyValue = getParentRowKeyValue(schemaName, thisTableRowKeyValue, child, collectConfig);
TableDataRow parent = child.getParent();
//如果子类则进行递归循环
return getRootRowKeyValue(parent, parentRowKeyValue, schemaName, collectConfig);
}
复制代码
通过以上我们就实现了关系映射和存储,在后边的程序中调用相关的函数,并且构建好对应的结构即可。\
通用 ES 网关的架构
ES 网关的主要作用是封装访问 ES 的读写 API,并且对访问做鉴权、流控等等。\
主要包含以下几个模块:
- 工单申请模块和鉴权模块,App 访问 ES 网关的申请,申请后会发送给用户一个授权码,保证访问和数据安全性。
- 配置模块,主要配置一些 ES 集群的元数据信息,比如集群包含哪些索引,ES 的版本,分组,以及权限,索引的基础信息,类型,映射关系等等。
- 写入模块,提供了同步写入 Dubbo 的方式,和异步写入的方式 Kafka 消息的方式,写入数据,同步写入更多的满足实时性的需要,异步写入满足不同业务的诉求,以及补数的诉求。
- 读取模块,提供常用的查询 API ,比如 Count 查询,普通查询,以及深层分页查询:Scroll 查询等基础查询功能。
- 流量切换模块,针对于主备集群运维升级,进行流量的切换,如果没有升级则进行相应的负载均衡。
- 限流模块,主要是用了 Hystrix 框架,限制写入的 QPS,避免大流量压垮 ES 集群,这里不仅有请求数的限制,而且还有请求大小的限制。
总结
整体的数据同步系统分为三个大的部分,Otter 部分,主要是同步数据库的 Binlog 数据到 Kafka,DTS 部分主要是聚合业务的数据,将子表的数据以及主表的数据聚合成为一个集合数据,网关部分主要是负责对整体查询和写入做统一的封装,提供写入和查询的 API,通过池化技术和异步的方式提高性能,并且提供切换流量和限流的功能,希望这篇文章可以给大家做数据同步提供一些启示。