分布式调度平台XXL-JOB源码分析-调度中心

亦凉 2023-08-17 15:38 187阅读 0赞

架构图

753271-20190924135844231-171876118.png

上图是我们要进行源码分析的2.1版本的整体架构图。其分为两大块,调度中心和执行器,本文先分析调度中心,也就是xxl-job-admin这个包的代码。

关键bean

在application.properties配置正确的数据库连接信息后,直接启动XxlJobAdminApplication即可。

配置类XxlJobAdminConfig,里面维护了一些调度中心端的配置数据。

XxlJobScheduler这个组件实现了InitializingBean接口,所以spring容器在初始化的时候会调用afterPropertiesSet方法,此方法如下:

753271-20190924101900837-1702053608.png

第一步国际化相关。

第二步监控相关。

第三步失败重试相关。

第四步启动admin端服务,接收注册请求等。

第五步JobScheduleHelper调度器,死循环,在xxl_job_info表里取将要执行的任务,更新下次执行时间的,调用JobTriggerPoolHelper类,来给执行器发送调度任务的

JobScheduleHelper

这个类就是死循环从xxl_job_info表中取出未来5秒内要执行的任务,进行调度分发。

753271-20190924144907026-1368534067.png

启动了两个守护线程,先来看scheduleThread。

753271-20190924150125371-1846795726.png

死循环内的代码如上图,首先利用for update语句进行获取任务的资格锁定,再去获取未来5秒内即将要执行的任务。

753271-20190924150352334-2129024656.png

753271-20190924150322278-649373045.png

展开遍历任务的逻辑代码,有三个分支

753271-20190924151330565-2125652558.png

第一个分支当前任务的触发时间已经超时5秒以上了,不在执行,直接计算下一次触发时间。

753271-20190924153617775-1304815222.png

第二个分支为触发时间已满足,利用JobTriggerPoolHelper这个类进行任务调度,之后判断下一次执行时间如果在5秒内,进行此任务数据的缓存,处理逻辑与第三个分支一样。

753271-20190924154200623-655562073.png

对触发时间秒数进行60取模,跟进pushTimeRing方法

753271-20190924155418344-706085786.png

ringData是以0到59的整数为key,以jobId集合为value的Map集合。这个集合数据的处理逻辑,就在我们第二个守护线程ringThread中。

  1. 1 while (!ringThreadToStop) {
  2. 2 try {
  3. 3 // second data
  4. 4 List<Integer> ringItemData = new ArrayList<>();
  5. 5 int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
  6. 6 for (int i = 0; i < 2; i++) {
  7. 7 List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
  8. 8 if (tmpData != null) {
  9. 9 ringItemData.addAll(tmpData);
  10. 10 }
  11. 11 }
  12. 12 // ring trigger
  13. 13 logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
  14. 14 if (ringItemData!=null && ringItemData.size()>0) {
  15. 15 // do trigger
  16. 16 for (int jobId: ringItemData) {
  17. 17 // do trigger
  18. 18 JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);
  19. 19 }
  20. 20 // clear
  21. 21 ringItemData.clear();
  22. 22 }
  23. 23 } catch (Exception e) {
  24. 24 if (!ringThreadToStop) {
  25. 25 logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
  26. 26 }
  27. 27 }
  28. 28 // next second, align second
  29. 29 try {
  30. 30 TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000);
  31. 31 } catch (InterruptedException e) {
  32. 32 if (!ringThreadToStop) {
  33. 33 logger.error(e.getMessage(), e);
  34. 34 }
  35. 35 }
  36. 36 }

根据当前秒数刻度和前一个刻度进行时间轮的任务获取,之后和上文一样,利用JobTriggerPoolHelper进行任务调度。

时序图

753271-20190924194359472-16107611.png

JobTriggerPoolHelper

如前文所述,不管是scheduleThread还是ringThread,最后完成任务调度的都是JobTriggerPoolHelper.trigger方法,这个类有两个线程池fastTriggerPool和slowTriggerPool,顾名思义,分别是执行较快任务和较慢任务的,后查官方文档,如下:

753271-20190924104757901-1796015968.png

minTim属性,作用待明确

jobTimeoutCountMap属性,计数,key为jobId,value使用AtomicInteger计数。

helper静态变量指向自己本身,提供外部静态方法调用。

753271-20190924105720607-393540803.png

重要方法,向两种线程池其中之一提交调度任务,进行调度,引出XxlJobTrigger这个类,一路跟进去

753271-20190924112835765-354928850.png

753271-20190924113032158-120384122.png

继续跟进

753271-20190924113055497-1907427470.png

至此,完成执行器的任务调度。

时序图

753271-20190924213930801-512581316.png

接收注册和心跳请求

753271-20190926153634669-808587215.png

转载于:https://www.cnblogs.com/jiangyang/p/11576931.html

发表评论

表情:
评论列表 (有 0 条评论,187人围观)

还没有评论,来说两句吧...

相关阅读