前言

书接上回,继续来聊一聊DataX源码,在上篇文章中我们已经对DataX的整体架构以及运行流程有了一个比较细致的了解,这篇文章我们将更深层次的研究DataX在调度方面的细节。

调度流程解析

确认最终任务需要的channel数量

注:channel是子任务数据传输的内存模型,后续文章将详细剖析,在这里可以暂且认为就是任务分片数量

在任务周期中含有一个split()阶段,在这个阶段做了两件事情:

  1. 通过配置项计算出建议的并发channel数量
  2. 执行reader插件中的的实际切片逻辑,并根据数量切分configuration,请注意,这一步计算出的数量可能小于第一步配置的并发数

所以在真正调度阶段,需要根据split()阶段中计算的两个值,计算出最终的channel数量

image-20220427120240736

通过channel数量分配taskGroup

在计算出真正需要的channel数量之后,根据每个TaskGroup应该被分配任务的个数,计算TaskGroup的个数:

image-20220427144829662

由上图可知,任务的分配是由JobAssignUtil去进行,而且从方法名称assignFairly也可以知晓,分配的逻辑是公平分配,使用的Round Robin算法,轮询分配到每个TaskGroup中,在此就简单贴一下分配的核心源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
* <pre>
* 需要实现的效果通过例子来说是:
* a 库上有表:0, 1, 2
* a 库上有表:3, 4
* c 库上有表:5, 6, 7
*
* 如果有 4个 taskGroup
* 则 assign 后的结果为:
* taskGroup-0: 0, 4,
* taskGroup-1: 3, 6,
* taskGroup-2: 5, 2,
* taskGroup-3: 1, 7
* </pre>
* @param resourceMarkAndTaskIdMap 对所有任务进行编号的map
* @param jobConfiguration job的配置
* @param taskGroupNumber 一共需要的taskGroup的数量
* @return taskGroup的配置
*/
private static List<Configuration> doAssign(LinkedHashMap<String, List<Integer>> resourceMarkAndTaskIdMap, Configuration jobConfiguration, int taskGroupNumber) {

// 获取job配置
List<Configuration> contentConfig = jobConfiguration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT);

// 初始化一份taskGroup配置
Configuration taskGroupTemplate = jobConfiguration.clone();

// 将配置中的content配置项去掉
taskGroupTemplate.remove(CoreConstant.DATAX_JOB_CONTENT);

// 初始化taskGroup配置
List<Configuration> result = new LinkedList<Configuration>();
List<List<Configuration>> taskGroupConfigList = new ArrayList<List<Configuration>>(taskGroupNumber);
for (int i = 0; i < taskGroupNumber; i++) {
taskGroupConfigList.add(new LinkedList<Configuration>());
}

int mapValueMaxLength = -1;

// 获取任务编号,并给每个任务一个标记,每个任务的标记都一致,用一个map保存,标记 -> 编号
List<String> resourceMarks = new ArrayList<String>();
for (Map.Entry<String, List<Integer>> entry : resourceMarkAndTaskIdMap.entrySet()) {
resourceMarks.add(entry.getKey());
if (entry.getValue().size() > mapValueMaxLength) {
mapValueMaxLength = entry.getValue().size();
}
}

// 核心Round Robin分配逻辑
int taskGroupIndex = 0;
for (int i = 0; i < mapValueMaxLength; i++) {
for (String resourceMark : resourceMarks) {
if (resourceMarkAndTaskIdMap.get(resourceMark).size() > 0) {
int taskId = resourceMarkAndTaskIdMap.get(resourceMark).get(0);
taskGroupConfigList.get(taskGroupIndex % taskGroupNumber).add(contentConfig.get(taskId));
taskGroupIndex++;

resourceMarkAndTaskIdMap.get(resourceMark).remove(0);
}
}
}

Configuration tempTaskGroupConfig;
for (int i = 0; i < taskGroupNumber; i++) {
tempTaskGroupConfig = taskGroupTemplate.clone();
tempTaskGroupConfig.set(CoreConstant.DATAX_JOB_CONTENT, taskGroupConfigList.get(i));
tempTaskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID, i);

result.add(tempTaskGroupConfig);
}

return result;
}

启动调度

ok,已经确定完了taskGroup的个数以及每个taskGroup的channel数,接下来就到了真正启动任务的环节:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 初始化运行模式
ExecuteMode executeMode = null;

// 初始化调度器
AbstractScheduler scheduler;
try {
// 在这里可以看到,DataX官方写死了运行模式,只支持STANDALONE模式,不支持分布式
executeMode = ExecuteMode.STANDALONE;

// 初始化调度器
scheduler = initStandaloneScheduler(this.configuration);

// 设置运行模式
for (Configuration taskGroupConfig : taskGroupConfigs) {
taskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, executeMode.getValue());
}

// 看到这里我彻底笑了 ^_^ ^_^ ^_^
if (executeMode == ExecuteMode.LOCAL || executeMode == ExecuteMode.DISTRIBUTE) {
if (this.jobId <= 0) {
throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR,
"在[ local | distribute ]模式下必须设置jobId,并且其值 > 0 .");
}
}

LOG.info("Running by {} Mode.", executeMode);

// 记录任务开始时间
this.startTransferTimeStamp = System.currentTimeMillis();

// 开始调度
scheduler.schedule(taskGroupConfigs);

// 记录任务结束时间
this.endTransferTimeStamp = System.currentTimeMillis();
} catch (Exception e) {
LOG.error("运行scheduler 模式[{}]出错.", executeMode);
this.endTransferTimeStamp = System.currentTimeMillis();
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR, e);
}

// 检查任务执行情况
this.checkLimit();

整个任务从scheduler.schedule(taskGroupConfigs)处被启动,在这个方法中,又调用了startAllTaskGroup(configurations)

image-20220427180441170

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public void startAllTaskGroup(List<Configuration> configurations) {
// 生成线程池,对于线程池的个数是由taskGroup的数量进行控制
this.taskGroupContainerExecutorService = Executors
.newFixedThreadPool(configurations.size());

// 注册taskGroup任务到线程池中并启动
for (Configuration taskGroupConfiguration : configurations) {
TaskGroupContainerRunner taskGroupContainerRunner = newTaskGroupContainerRunner(taskGroupConfiguration);
this.taskGroupContainerExecutorService.execute(taskGroupContainerRunner);
}
// 关闭线程池提交线程的入口
this.taskGroupContainerExecutorService.shutdown();
}

实际上DataX底层对于每个taskGroup都启动了一个线程TaskGroupContainerRunner,采用线程池的方式实现并发操作

调度子单位解析

TaskGroupContainerRunner

TaskGroupContainerRunner是线程子单位,作为最上层的封装,直接提交到线程池中执行:

image-20220427182500027

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class TaskGroupContainerRunner implements Runnable {

private TaskGroupContainer taskGroupContainer;

private State state;

public TaskGroupContainerRunner(TaskGroupContainer taskGroup) {
this.taskGroupContainer = taskGroup;
this.state = State.SUCCEEDED;
}

@Override
public void run() {
try {
Thread.currentThread().setName(
String.format("taskGroup-%d", this.taskGroupContainer.getTaskGroupId()));
// 启动真正的并发子单位
this.taskGroupContainer.start();
this.state = State.SUCCEEDED;
} catch (Throwable e) {
this.state = State.FAILED;
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR, e);
}
}

public TaskGroupContainer getTaskGroupContainer() {
return taskGroupContainer;
}

public State getState() {
return state;
}

public void setState(State state) {
this.state = state;
}
}

从代码上看,真正核心的并发子单位是TaskGroupContainer

TaskContainer

来到真正核心的TaskContainer中,这里真正启动了任务,TaskContainer主要做了以下事情:

  1. 注册task

    image-20220427183425530

  2. 启动task,每个子任务也就是最小的并发单位的执行器是TaskExecutor

    image-20220427183838995

  3. 循环监控task状态,如果出现失败会进行重试

    image-20220427183937709

TaskExecutor

image-20220427185430963

从这个类里面可以看见,做核心传输逻辑的变量是两个线程,一个写的线程,一个读的线程,在doStart方法中,启动了这两个线程:

image-20220427191129968

至此,整个任务被完全启动了起来。

总结

DataX整个调度依赖于java底层线程池,它对任务进行分片后并将子任务使用Round Robin算法划分到各个任务组,以一个任务组为基本线程放进线程池并启动;同时一个子任务也包含两个线程去实现写读的流程,DataX能实现精准的流控在于它底层对分片的控制,至此,DataX的全部调度流程概括如下:

  1. 根据流控、并发配置确定分片数量
  2. 根据分片数量确定TaskGroup数量
  3. 通过Round Robin算法分配task至TaskGroup
  4. 启动TaskGroup
  5. 每个TaskGroup启动多个TaskExecutor
  6. TaskExecutor启动ReaderThread和WriterThread

下篇文章我们将聊一聊DataX的单个数据分片之间的数据传输原理,也就是TaskExecutor中的ReaderThread和WriterThread之间如何交换数据,敬请期待,我们下期再见!