前言 书接上回,继续来聊一聊DataX源码,在上篇文章中我们已经对DataX的整体架构以及运行流程有了一个比较细致的了解,这篇文章我们将更深层次的研究DataX在调度方面的细节。
调度流程解析 确认最终任务需要的channel数量 注:channel是子任务数据传输的内存模型,后续文章将详细剖析,在这里可以暂且认为就是任务分片数量
在任务周期中含有一个split()阶段,在这个阶段做了两件事情:
通过配置项计算出建议的并发channel数量 执行reader插件中的的实际切片逻辑,并根据数量切分configuration,请注意,这一步计算出的数量可能小于第一步配置的并发数 所以在真正调度阶段,需要根据split()阶段中计算的两个值,计算出最终的channel数量
通过channel数量分配taskGroup 在计算出真正需要的channel数量之后,根据每个TaskGroup应该被分配任务的个数,计算TaskGroup的个数:
由上图可知,任务的分配是由JobAssignUtil
去进行,而且从方法名称assignFairly
也可以知晓,分配的逻辑是公平分配,使用的Round Robin算法,轮询分配到每个TaskGroup中,在此就简单贴一下分配的核心源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 private static List<Configuration> doAssign (LinkedHashMap<String, List<Integer>> resourceMarkAndTaskIdMap, Configuration jobConfiguration, int taskGroupNumber) { List<Configuration> contentConfig = jobConfiguration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT); Configuration taskGroupTemplate = jobConfiguration.clone(); taskGroupTemplate.remove(CoreConstant.DATAX_JOB_CONTENT); List<Configuration> result = new LinkedList<Configuration>(); List<List<Configuration>> taskGroupConfigList = new ArrayList<List<Configuration>>(taskGroupNumber); for (int i = 0 ; i < taskGroupNumber; i++) { taskGroupConfigList.add(new LinkedList<Configuration>()); } int mapValueMaxLength = -1 ; List<String> resourceMarks = new ArrayList<String>(); for (Map.Entry<String, List<Integer>> entry : resourceMarkAndTaskIdMap.entrySet()) { resourceMarks.add(entry.getKey()); if (entry.getValue().size() > mapValueMaxLength) { mapValueMaxLength = entry.getValue().size(); } } int taskGroupIndex = 0 ; for (int i = 0 ; i < mapValueMaxLength; i++) { for (String resourceMark : resourceMarks) { if (resourceMarkAndTaskIdMap.get(resourceMark).size() > 0 ) { int taskId = resourceMarkAndTaskIdMap.get(resourceMark).get(0 ); taskGroupConfigList.get(taskGroupIndex % taskGroupNumber).add(contentConfig.get(taskId)); taskGroupIndex++; resourceMarkAndTaskIdMap.get(resourceMark).remove(0 ); } } } Configuration tempTaskGroupConfig; for (int i = 0 ; i < taskGroupNumber; i++) { tempTaskGroupConfig = taskGroupTemplate.clone(); tempTaskGroupConfig.set(CoreConstant.DATAX_JOB_CONTENT, taskGroupConfigList.get(i)); tempTaskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID, i); result.add(tempTaskGroupConfig); } return result; }
启动调度 ok,已经确定完了taskGroup的个数以及每个taskGroup的channel数,接下来就到了真正启动任务的环节:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 ExecuteMode executeMode = null ; AbstractScheduler scheduler; try { executeMode = ExecuteMode.STANDALONE; scheduler = initStandaloneScheduler(this .configuration); for (Configuration taskGroupConfig : taskGroupConfigs) { taskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, executeMode.getValue()); } if (executeMode == ExecuteMode.LOCAL || executeMode == ExecuteMode.DISTRIBUTE) { if (this .jobId <= 0 ) { throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, "在[ local | distribute ]模式下必须设置jobId,并且其值 > 0 ." ); } } LOG.info("Running by {} Mode." , executeMode); this .startTransferTimeStamp = System.currentTimeMillis(); scheduler.schedule(taskGroupConfigs); this .endTransferTimeStamp = System.currentTimeMillis(); } catch (Exception e) { LOG.error("运行scheduler 模式[{}]出错." , executeMode); this .endTransferTimeStamp = System.currentTimeMillis(); throw DataXException.asDataXException( FrameworkErrorCode.RUNTIME_ERROR, e); } this .checkLimit();
整个任务从scheduler.schedule(taskGroupConfigs)
处被启动,在这个方法中,又调用了startAllTaskGroup(configurations)
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Override public void startAllTaskGroup (List<Configuration> configurations) { this .taskGroupContainerExecutorService = Executors .newFixedThreadPool(configurations.size()); for (Configuration taskGroupConfiguration : configurations) { TaskGroupContainerRunner taskGroupContainerRunner = newTaskGroupContainerRunner(taskGroupConfiguration); this .taskGroupContainerExecutorService.execute(taskGroupContainerRunner); } this .taskGroupContainerExecutorService.shutdown(); }
实际上DataX底层对于每个taskGroup都启动了一个线程TaskGroupContainerRunner
,采用线程池的方式实现并发操作
调度子单位解析 TaskGroupContainerRunner TaskGroupContainerRunner是线程子单位,作为最上层的封装,直接提交到线程池中执行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public class TaskGroupContainerRunner implements Runnable { private TaskGroupContainer taskGroupContainer; private State state; public TaskGroupContainerRunner (TaskGroupContainer taskGroup) { this .taskGroupContainer = taskGroup; this .state = State.SUCCEEDED; } @Override public void run () { try { Thread.currentThread().setName( String.format("taskGroup-%d" , this .taskGroupContainer.getTaskGroupId())); this .taskGroupContainer.start(); this .state = State.SUCCEEDED; } catch (Throwable e) { this .state = State.FAILED; throw DataXException.asDataXException( FrameworkErrorCode.RUNTIME_ERROR, e); } } public TaskGroupContainer getTaskGroupContainer () { return taskGroupContainer; } public State getState () { return state; } public void setState (State state) { this .state = state; } }
从代码上看,真正核心的并发子单位是TaskGroupContainer
TaskContainer 来到真正核心的TaskContainer中,这里真正启动了任务,TaskContainer主要做了以下事情:
注册task
启动task,每个子任务也就是最小的并发单位的执行器是TaskExecutor
循环监控task状态,如果出现失败会进行重试
TaskExecutor
从这个类里面可以看见,做核心传输逻辑的变量是两个线程,一个写的线程,一个读的线程,在doStart方法中,启动了这两个线程:
至此,整个任务被完全启动了起来。
总结 DataX整个调度依赖于java底层线程池,它对任务进行分片后并将子任务使用Round Robin算法划分到各个任务组,以一个任务组为基本线程放进线程池并启动;同时一个子任务也包含两个线程去实现写读的流程,DataX能实现精准的流控在于它底层对分片的控制,至此,DataX的全部调度流程概括如下:
根据流控、并发配置确定分片数量 根据分片数量确定TaskGroup数量 通过Round Robin算法分配task至TaskGroup 启动TaskGroup 每个TaskGroup启动多个TaskExecutor TaskExecutor启动ReaderThread和WriterThread 下篇文章我们将聊一聊DataX的单个数据分片之间的数据传输原理,也就是TaskExecutor中的ReaderThread和WriterThread之间如何交换数据,敬请期待,我们下期再见!