源码分析 Datax 任务拆分逻辑

2021 的第三篇文章

内容简述

这篇文章将通过源码去了解 Datax 是如何对任务进行逻辑拆分的。

拆分逻辑

Datax 对于任务的拆分,实际上也是为了将任务拆到最小颗粒度便于控制和执行.下面是一张官方的流程图

具体的拆分流程如下:

  1. Datax 根据不同的源端切分策略,将 Job 切分成多个小的 Task(子任务),以便于并发执行。Task 便是DataX 作业的最小单元,每一个Task都会负责一部分数据的同步工作。
  2. 便是DataX 切分多个 Task 之后,Job 会调用 Scheduler 模块,根据配置的并发数据量,将拆分成的 Task 重新组合,组装成 TaskGroup (任务组)。每一个 TaskGroup 负责以一定的并发运行完毕分配好的所有 Task,默认单个任务组的并发数量为 5。
  3. 每一个 Task 都由 TaskGroup 负责启动,Task启动后,会固定启动 Reader—>Channel—>Writer 的线程来完成任务同步工作。
  4. DataX 作业运行起来之后, Job 监控并等待多个 TaskGroup 模块任务完成,等待所有 TaskGroup 任务完成后Job成功退出。否则异常退出,进程退出值非 0

接着我们根据流程去讲解

源码解析

reader 拆分的流程

以下一个 Mysql 数据库同步抽取数据到本地的作业的配置文件。

{
    "job": {
        "setting": {
            "speed": {
                 "channel": 3
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "root",
                        "column": [
                            "id",
                            "name"
                        ],
                        "splitPk": "db_id",
                        "connection": [
                            {
                                "table": [
                                    "table"
                                ],
                                "jdbcUrl": [
     "jdbc:mysql://127.0.0.1:3306/database"
                                ]
                            }
                        ]
                    }
                }  .
            }
        ]
    }
}
复制代码

又或者是通过自定义 SQL 语句的方式进行连接。这两种方式通过 isTableMode 来区分。

 "content": [
    {
        "reader": {
            "name": "mysqlreader",
            "parameter": {
                "username": "root",
                "password": "root",
                "connection": [
                    {
                        "querySql": [
                            "select db_id,on_line_flag from db_info where db_id < 10;"
                        ],
                        "jdbcUrl": [
                            "jdbc:mysql://bad_ip:3306/database",
                            "jdbc:mysql://127.0.0.1:bad_port/database",
                            "jdbc:mysql://127.0.0.1:3306/database"
                        ]
                    }
                ]
            }
        }
    }
]
复制代码

在上篇文章中,我们讲 JobContainer 是 Job 执行器,负责全局拆分、调度、前置语句和后置语句等工作的工作单元。这样我们可以知道拆分的代码的位置了

    public void start() {
        //...
        this. .totalStage = this.s .plit();     //切分
        //...
    }
复制代码

调用了 split 方法后,实际上是调用 reader 的 split 方法(这里以 MySqlReader 为例)

    public List<Configuration> split(int adviceNumber) {
        return this.commonRdbmsReaderJob.split(this.originalConfig, adviceNumber);
    }
复制代码

而 reader 调用的实际是交付给 ReaderSplitUtil 的 split。由于代码有点长,我分段讲解。

首先判断 isTableMode 来判断是否需要拆分表.isTableMode=TRUE 代表通过配置方式获取数据; isTableMode=FALSE 代表是通过自定义 SQL

当 isTableMode=TRUE,需要根据表的数量、eachTableShouldSplittedNumber 进一步计算

    List<String> tables = connConf.getList(Key.TABLE, String.class);
    String splitPk = originalSliceConfig.getString(Key.SPLIT_PK, null); //①

    boolean needSplitTable = eachTableShouldSplittedNumber > 1
            && StringUtils.isNotBlank(splitPk);  //②
    if (needSplitTable) {
        //...
    } else {
        //...
    }
复制代码

上面的代码对应编号的逻辑解读是:

  1. 获取配置中的表格数组以及切割的主键字段
  2. 如果 eachTableShouldSplittedNumber > 1 && splitPk 不为空,说明需要进行切分,反之如果 eachTableShouldSplittedNumber 的数量等于或 splitPk 为空,说明可以直接以整表数据发送或不用通过 splitPk 来切分

所以当结果需要切分的话,就先对表格的数量判断

    if (tables.size() == 1) { //①
        Integer splitFactor = originalSliceConfig.getInt(Key.SPLIT_FACTOR, Constant.SPLIT_FACTOR);
        eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * splitFactor;
    }
    for (String table : tables) { //②
        tempSlice = sliceConfig.clone();
        tempSlice.set(Key.TABLE, table);

        List<Configuration> splittedSlices = SingleTableSplitUtil
                .splitSingleTable(tempSlice, eachTableShouldSplittedNumber);

        splittedConfigs.addAll(splittedSlices);
    }
复制代码

代码①是获取切割因子,通过 eachTableShouldSplittedNumber*splitFactor 得到切割数
代码②是循环 tables,然后直接使用 eachTableShouldSplittedNumber 来对每个 table 的数据切割

如果 isTableMode=FALSE,说明结果不需要切分,可以赋值 QUERY_SQL 了

    for (String table : tables) {
        tempSlice = sliceConfig.clone();
        tempSlice.set(Key.TABLE, table);
        String queryColumn = HintUtil.buildQueryColumn(jdbcUrl, table, column);
        tempSlice.set(Key.QUERY_SQL, SingleTableSplitUtil.buildQuerySql(queryColumn, table, where)); //拆分查询
        splittedConfigs.add(tempSlice);
    }
复制代码

在上面看到,实际拆分调用的是 SingleTableSplitUtil 的 splitSingleTable 方法。由于代码过长,我这里仅仅贴上代码逻辑。拆分的逻辑如下:

  1. 首先获取 splitPkName、column、table、where 等配置信息。
  2. 调用 SingleTablesSplitUtil#getPkRange 方法获取抓取数据的范围。。
  3. 判断 splitPkName 是 long 还是字符串,调用不同的方法进行拆分。
  4. 最后通过拆分结果进行赋值 QUERY_SQL。
  5. 最后考虑到有可能 splitPkName is null 的情况,所以拼接一个 Configuration 进行补充。

writer 拆分的流程

writer 相对于 reader 来说要简单一些.实际上的逻辑分为单表以及多表,实际上的逻辑如下:

  1. 获取表的数量。如果是单表,就循环 channelNumber 生成对应数量的 Configuration。
  2. 如果是多链接多表的情况下,需要对应循环链接再循环表,生成对应的 Configuration。

合并配置文件

在 reader 以及 writer 切分好的情况下,开始合并任务配置。为啥要合并?因为在 Datax 的切分后,每个任务都是单独的一个任务,所以它们必须拥有全部配置信息。

    List<Configuration> contentConfig = mergeReaderAndWriterTaskConfigs(
            readerTaskConfigs, writerTaskConfigs, transformerList);
复制代码

合并的代码在 JobContainer#mergeReaderAndWriterTaskConfigs

    private List<Configuration> mergeReaderAndWriterTaskConfigs(
            List<Configuration> readerTasksConfigs,
            List<Configuration> writerTasksConfigs,
            List<Configuration> transformerConfigs) {
    //①
    if (readerTasksConfigs.size() != writerTasksConfigs.size()) {
        throw DataXException.asDataXException(
                FrameworkErrorCode.PLUGIN_SPLIT_ERROR,
                String.format("reader切分的task数目[%d]不等于writer切分的task数目[%d].",
                        readerTasksConfigs.size(), writerTasksConfigs.size())
        );
    }
    //②
    List<Configuration> contentConfigs = new ArrayList<Configuration>();
    for (int i = 0; i < readerTasksConfigs.size(); i++) {
        Configuration taskConfig = Configuration.newDefault();
        taskConfig.set(CoreConstant.JOB_READER_NAME,
                this.readerPluginName);
        taskConfig.set(CoreConstant.JOB_READER_PARAMETER,
                readerTasksConfigs.get(i));
        taskConfig.set(CoreConstant.JOB_WRITER_NAME,
                this.writerPluginName);
        taskConfig.set(CoreConstant.JOB_WRITER_PARAMETER,
                writerTasksConfigs.get(i));

        if(transformerConfigs!=null && transformerConfigs.size()>0){
            taskConfig.set(CoreConstant.JOB_TRANSFORMER, transformerConfigs);
        }

        taskConfig.set(CoreConstant.TASK_ID, i);
        contentConfigs.add(taskConfig);
    }

    return contentConfigs;
}
复制代码

代码①,检查是否 writer 和 reader 的任务切割后相同
代码②,设置 readerPlughinName、writerPluiginName、readConfig、writerConfig 等

文末

Datax 的任务拆分的逻辑的目的就是为了将更大的取数逻辑拆分到更加容易控制的颗粒度来执行,所以并不是非常的复杂。在实现一些同样性质的需求也可以考虑如此拆分。

完 🙂

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享