XxlJob源码解析

以你之姓@ 2023-06-12 04:30 155阅读 0赞

git地址:https://github.com/xuxueli/xxl-job

版本:v2.1.1

执行器初始化流程

在容器初始化好xxlJobExecutor后会执行其initMethod,也就是start方法

  1. @Configuration
  2. public class XxlJobConfig {
  3. private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
  4. @Value("${xxl.job.admin.addresses}")
  5. private String adminAddresses;
  6. @Value("${xxl.job.executor.appname}")
  7. private String appName;
  8. @Value("${xxl.job.executor.ip}")
  9. private String ip;
  10. @Value("${xxl.job.executor.port}")
  11. private int port;
  12. @Value("${xxl.job.accessToken}")
  13. private String accessToken;
  14. @Value("${xxl.job.executor.logpath}")
  15. private String logPath;
  16. @Value("${xxl.job.executor.logretentiondays}")
  17. private int logRetentionDays;
  18. @Bean(initMethod = "start", destroyMethod = "destroy")
  19. public XxlJobSpringExecutor xxlJobExecutor() {
  20. logger.info(">>>>>>>>>>> xxl-job config init.");
  21. XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
  22. xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
  23. xxlJobSpringExecutor.setAppName(appName);
  24. xxlJobSpringExecutor.setIp(ip);
  25. xxlJobSpringExecutor.setPort(port);
  26. xxlJobSpringExecutor.setAccessToken(accessToken);
  27. xxlJobSpringExecutor.setLogPath(logPath);
  28. xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
  29. return xxlJobSpringExecutor;
  30. }
  31. /**
  32. * 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
  33. *
  34. * 1、引入依赖:
  35. * <dependency>
  36. * <groupId>org.springframework.cloud</groupId>
  37. * <artifactId>spring-cloud-commons</artifactId>
  38. * <version>${version}</version>
  39. * </dependency>
  40. *
  41. * 2、配置文件,或者容器启动变量
  42. * spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
  43. *
  44. * 3、获取IP
  45. * String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
  46. */
  47. }

这是start方法的一些启动步骤

启动分为3步,

1.初始化作业执行器,并且全部缓存起来

初始化容器作业,就是从spring上下文中获取到使用了所有使用了@JobHandler注解的bean,然后根据这个注解的value作为key,实例作为value缓存到名为jobHandlerRepository的map中

2.刷新GLUE工厂

3.调用父类启动方法

  1. XxlJobSpringExecutor.java
  2. @Override
  3. public void start() throws Exception {
  4. // 1.初始化作业执行器,并且全部缓存起来
  5. initJobHandlerRepository(applicationContext);
  6. // 2.刷新GLUE工厂
  7. GlueFactory.refreshInstance(1);
  8. // 3.调用父类启动方法
  9. super.start();
  10. }
  11. private void initJobHandlerRepository(ApplicationContext applicationContext){
  12. if (applicationContext == null) {
  13. return;
  14. }
  15. // init job handler action
  16. //获得所有用到了@JobHandler注解的bean
  17. Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHandler.class);
  18. if (serviceBeanMap!=null && serviceBeanMap.size()>0) {
  19. for (Object serviceBean : serviceBeanMap.values()) {
  20. if (serviceBean instanceof IJobHandler){
  21. String name = serviceBean.getClass().getAnnotation(JobHandler.class).value();
  22. IJobHandler handler = (IJobHandler) serviceBean;
  23. if (loadJobHandler(name) != null) {
  24. throw new RuntimeException("xxl-job jobhandler["+ name +"] naming conflicts.");
  25. }
  26. //把所有执行器bean缓存起来
  27. registJobHandler(name, handler);
  28. }
  29. }
  30. }
  31. }

把加了@JobHandler注解的bean全部缓存到jobHandlerRepository中

  1. XxlJobExecutor.java
  2. // ---------------------- job handler repository ----------------------
  3. private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
  4. public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
  5. logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
  6. return jobHandlerRepository.put(name, jobHandler);
  7. }
  8. public static IJobHandler loadJobHandler(String name){
  9. return jobHandlerRepository.get(name);
  10. }

下面就是父类的start方法的一个流程

1.会先初始化本地日志路径,

2.初始化调度中心的地址列表,创建好adminBiz实例,调度中心客户端

3.启动日志文件清理的线程

4.启动回调子任务的线程

5.初始化执行服务器

6.初始化Rpc提供程序

下面详细看看这个第4步

  1. XxlJobExecutor.java
  2. /**
  3. * ---------------------- start + stop ----------------------
  4. * @throws Exception
  5. */
  6. public void start() throws Exception {
  7. // 1.init logpath 初始化本地日志路径
  8. XxlJobFileAppender.initLogPath(logPath);
  9. // 初始化调用者,管理客户端
  10. // 2.初始化调度中心的地址列表,创建好adminBiz实例,调度中心客户端
  11. initAdminBizList(adminAddresses, accessToken);
  12. //3. 初始化作业日志文件清理线程
  13. JobLogFileCleanThread.getInstance().start(logRetentionDays);
  14. // 4.初始化触发回调线程,执行子任务
  15. TriggerCallbackThread.getInstance().start();
  16. // 5.初始化执行服务器
  17. port = port>0?port: NetUtil.findAvailablePort(9999);
  18. ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
  19. //6.初始化Rpc提供程序
  20. initRpcProvider(ip, port, appName, accessToken);
  21. }

这边主要启动了2个线程,并且都设置为了守护线程,就是说这2个线程不会影响jvm的退出

1个线程是实时去消费阻塞队列里面的回调参数,另一个线程主要是负责运行重试回调失败的调度任务,30秒执行一次

  1. TriggerCallbackThread.java
  2. public void start() {
  3. // valid
  4. if (XxlJobExecutor.getAdminBizList() == null) {
  5. logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null.");
  6. return;
  7. }
  8. // callback
  9. triggerCallbackThread = new Thread(new Runnable() {
  10. @Override
  11. public void run() {
  12. // 正常回调
  13. while(!toStop){
  14. try {
  15. //这里采用了阻塞队列,可以看出,当服务中心发送任务到此队列,就会被消费
  16. HandleCallbackParam callback = getInstance().callBackQueue.take();
  17. if (callback != null) {
  18. // 回调参数集合
  19. List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
  20. int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
  21. callbackParamList.add(callback);
  22. // 回调,如果发生错误将把错误写入日志文件然后重试
  23. if (callbackParamList!=null && callbackParamList.size()>0) {
  24. doCallback(callbackParamList);
  25. }
  26. }
  27. } catch (Exception e) {
  28. if (!toStop) {
  29. logger.error(e.getMessage(), e);
  30. }
  31. }
  32. }
  33. // 最后的回调
  34. try {
  35. List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
  36. int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
  37. if (callbackParamList!=null && callbackParamList.size()>0) {
  38. doCallback(callbackParamList);
  39. }
  40. } catch (Exception e) {
  41. if (!toStop) {
  42. logger.error(e.getMessage(), e);
  43. }
  44. }
  45. logger.info(">>>>>>>>>>> xxl-job, executor callback thread destory.");
  46. }
  47. });
  48. triggerCallbackThread.setDaemon(true);
  49. triggerCallbackThread.setName("xxl-job, executor TriggerCallbackThread");
  50. triggerCallbackThread.start();
  51. // 重试
  52. triggerRetryCallbackThread = new Thread(new Runnable() {
  53. @Override
  54. public void run() {
  55. while(!toStop){
  56. try {
  57. //重试失败回调文件
  58. retryFailCallbackFile();
  59. } catch (Exception e) {
  60. if (!toStop) {
  61. logger.error(e.getMessage(), e);
  62. }
  63. }
  64. try {
  65. //每30s执行一次
  66. TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
  67. } catch (InterruptedException e) {
  68. if (!toStop) {
  69. logger.error(e.getMessage(), e);
  70. }
  71. }
  72. }
  73. logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destory.");
  74. }
  75. });
  76. triggerRetryCallbackThread.setDaemon(true);
  77. //调用重试回调线程
  78. triggerRetryCallbackThread.start();
  79. }
  80. public void toStop(){
  81. toStop = true;
  82. // stop callback, interrupt and wait
  83. if (triggerCallbackThread != null) { // support empty admin address
  84. triggerCallbackThread.interrupt();
  85. try {
  86. triggerCallbackThread.join();
  87. } catch (InterruptedException e) {
  88. logger.error(e.getMessage(), e);
  89. }
  90. }
  91. // stop retry, interrupt and wait
  92. if (triggerRetryCallbackThread != null) {
  93. triggerRetryCallbackThread.interrupt();
  94. try {
  95. triggerRetryCallbackThread.join();
  96. } catch (InterruptedException e) {
  97. logger.error(e.getMessage(), e);
  98. }
  99. }
  100. }
  101. /**
  102. * 进行回调,如果发生错误将重试
  103. * @param callbackParamList
  104. */
  105. private void doCallback(List<HandleCallbackParam> callbackParamList){
  106. boolean callbackRet = false;
  107. // callback, will retry if error
  108. for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
  109. try {
  110. ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);
  111. if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
  112. callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish.");
  113. callbackRet = true;
  114. break;
  115. } else {
  116. callbackLog(callbackParamList, "<br>----------- xxl-job job callback fail, callbackResult:" + callbackResult);
  117. }
  118. } catch (Exception e) {
  119. callbackLog(callbackParamList, "<br>----------- xxl-job job callback error, errorMsg:" + e.getMessage());
  120. }
  121. }
  122. //如果回调失败,就写入失败回调文件,错误日志回调线程会继续重试
  123. if (!callbackRet) {
  124. //写入失败回调文件
  125. appendFailCallbackFile(callbackParamList);
  126. }
  127. }
  128. TriggerCallbackThread.java
  129. /**
  130. * 进行回调,如果发生错误将重试
  131. * @param callbackParamList
  132. */
  133. private void doCallback(List<HandleCallbackParam> callbackParamList){
  134. boolean callbackRet = false;
  135. // 回调,如果出错将重试
  136. for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
  137. try {
  138. ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);
  139. if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
  140. callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish.");
  141. callbackRet = true;
  142. break;
  143. } else {
  144. callbackLog(callbackParamList, "<br>----------- xxl-job job callback fail, callbackResult:" + callbackResult);
  145. }
  146. } catch (Exception e) {
  147. callbackLog(callbackParamList, "<br>----------- xxl-job job callback error, errorMsg:" + e.getMessage());
  148. }
  149. }
  150. //如果回调失败,就写入失败回调文件,错误日志回调线程会继续重试
  151. if (!callbackRet) {
  152. //写入失败回调文件
  153. appendFailCallbackFile(callbackParamList);
  154. }
  155. }
  156. AdminBizImpl.java
  157. /**
  158. * 回调
  159. * @param callbackParamList 回调参数集合
  160. * @return
  161. */
  162. @Override
  163. public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
  164. for (HandleCallbackParam handleCallbackParam: callbackParamList) {
  165. ReturnT<String> callbackResult = callback(handleCallbackParam);
  166. logger.debug(">>>>>>>>> JobApiController.callback {}, handleCallbackParam={}, callbackResult={}",
  167. (callbackResult.getCode()==IJobHandler.SUCCESS.getCode()?"success":"fail"), handleCallbackParam, callbackResult);
  168. }
  169. return ReturnT.SUCCESS;
  170. }

循环调度子任务,并保存日志

  1. AdminBizImpl.java
  2. private ReturnT<String> callback(HandleCallbackParam handleCallbackParam) {
  3. // valid log item
  4. XxlJobLog log = xxlJobLogDao.load(handleCallbackParam.getLogId());
  5. if (log == null) {
  6. return new ReturnT<String>(ReturnT.FAIL_CODE, "log item not found.");
  7. }
  8. if (log.getHandleCode() > 0) {
  9. return new ReturnT<String>(ReturnT.FAIL_CODE, "log repeate callback."); // avoid repeat callback, trigger child job etc
  10. }
  11. // 调度子任务
  12. String callbackMsg = null;
  13. if (IJobHandler.SUCCESS.getCode() == handleCallbackParam.getExecuteResult().getCode()) {
  14. //根据jodId查询XxlJobInfo
  15. XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(log.getJobId());
  16. if (xxlJobInfo!=null && xxlJobInfo.getChildJobId()!=null && xxlJobInfo.getChildJobId().trim().length()>0) {
  17. callbackMsg = "<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_child_run") +"<<<<<<<<<<< </span><br>";
  18. //获得子任务id的数组
  19. String[] childJobIds = xxlJobInfo.getChildJobId().split(",");
  20. for (int i = 0; i < childJobIds.length; i++) {
  21. //校验
  22. int childJobId = (childJobIds[i]!=null && childJobIds[i].trim().length()>0 && isNumeric(childJobIds[i]))?Integer.valueOf(childJobIds[i]):-1;
  23. if (childJobId > 0) {
  24. //调度子任务
  25. JobTriggerPoolHelper.trigger(childJobId, TriggerTypeEnum.PARENT, -1, null, null);
  26. ReturnT<String> triggerChildResult = ReturnT.SUCCESS;
  27. // add msg
  28. callbackMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg1"),
  29. (i+1),
  30. childJobIds.length,
  31. childJobIds[i],
  32. (triggerChildResult.getCode()==ReturnT.SUCCESS_CODE?I18nUtil.getString("system_success"):I18nUtil.getString("system_fail")),
  33. triggerChildResult.getMsg());
  34. } else {
  35. callbackMsg += MessageFormat.format(I18nUtil.getString("jobconf_callback_child_msg2"),
  36. (i+1),
  37. childJobIds.length,
  38. childJobIds[i]);
  39. }
  40. }
  41. }
  42. }
  43. // 处理msg
  44. StringBuffer handleMsg = new StringBuffer();
  45. if (log.getHandleMsg()!=null) {
  46. handleMsg.append(log.getHandleMsg()).append("<br>");
  47. }
  48. if (handleCallbackParam.getExecuteResult().getMsg() != null) {
  49. handleMsg.append(handleCallbackParam.getExecuteResult().getMsg());
  50. }
  51. if (callbackMsg != null) {
  52. handleMsg.append(callbackMsg);
  53. }
  54. // 成功, 保存日志
  55. log.setHandleTime(new Date());
  56. log.setHandleCode(handleCallbackParam.getExecuteResult().getCode());
  57. log.setHandleMsg(handleMsg.toString());
  58. xxlJobLogDao.updateHandleInfo(log);
  59. return ReturnT.SUCCESS;
  60. }

然后看看第6步

6.初始化Rpc提供程序

  1. XxlJobExecutor.java
  2. /**
  3. * 初始化rpc提供者
  4. * @param ip
  5. * @param port
  6. * @param appName
  7. * @param accessToken
  8. * @throws Exception
  9. */
  10. private void initRpcProvider(String ip, int port, String appName, String accessToken) throws Exception {
  11. // init, provider factory
  12. String address = IpUtil.getIpPort(ip, port);
  13. Map<String, String> serviceRegistryParam = new HashMap<String, String>();
  14. serviceRegistryParam.put("appName", appName);
  15. serviceRegistryParam.put("address", address);
  16. xxlRpcProviderFactory = new XxlRpcProviderFactory();
  17. xxlRpcProviderFactory.initConfig(NetEnum.NETTY_HTTP, Serializer.SerializeEnum.HESSIAN.getSerializer(), ip, port, accessToken, ExecutorServiceRegistry.class, serviceRegistryParam);
  18. // 添加服务
  19. xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl());
  20. // 启动Rpc提供工厂
  21. xxlRpcProviderFactory.start();
  22. }

这里会调用XxlJobExecutor的注册方法

  1. XxlRpcProviderFactory.java
  2. public void start() throws Exception {
  3. this.serviceAddress = IpUtil.getIpPort(this.ip, this.port);
  4. this.server = (Server)this.netType.serverClass.newInstance();
  5. this.server.setStartedCallback(new BaseCallback() {
  6. public void run() throws Exception {
  7. if (XxlRpcProviderFactory.this.serviceRegistryClass != null) {
  8. XxlRpcProviderFactory.this.serviceRegistry = (ServiceRegistry)XxlRpcProviderFactory.this.serviceRegistryClass.newInstance();
  9. XxlRpcProviderFactory.this.serviceRegistry.start(XxlRpcProviderFactory.this.serviceRegistryParam);
  10. if (XxlRpcProviderFactory.this.serviceData.size() > 0) {
  11. XxlRpcProviderFactory.this.serviceRegistry.registry(XxlRpcProviderFactory.this.serviceData.keySet(), XxlRpcProviderFactory.this.serviceAddress);
  12. }
  13. }
  14. }
  15. });
  16. this.server.setStopedCallback(new BaseCallback() {
  17. public void run() {
  18. if (XxlRpcProviderFactory.this.serviceRegistry != null) {
  19. if (XxlRpcProviderFactory.this.serviceData.size() > 0) {
  20. XxlRpcProviderFactory.this.serviceRegistry.remove(XxlRpcProviderFactory.this.serviceData.keySet(), XxlRpcProviderFactory.this.serviceAddress);
  21. }
  22. XxlRpcProviderFactory.this.serviceRegistry.stop();
  23. XxlRpcProviderFactory.this.serviceRegistry = null;
  24. }
  25. }
  26. });
  27. this.server.start(this);
  28. }

调用任务的注册和删除

  1. XxlJobExecutor.java
  2. //执行器启动初始化的时候回调用这里
  3. public static class ExecutorServiceRegistry extends ServiceRegistry {
  4. @Override
  5. public void start(Map<String, String> param) {
  6. // 启动注册
  7. ExecutorRegistryThread.getInstance().start(param.get("appName"), param.get("address"));
  8. }
  9. @Override
  10. public void stop() {
  11. // 停止注册
  12. ExecutorRegistryThread.getInstance().toStop();
  13. }
  14. @Override
  15. public boolean registry(Set<String> keys, String value) {
  16. return false;
  17. }
  18. @Override
  19. public boolean remove(Set<String> keys, String value) {
  20. return false;
  21. }
  22. @Override
  23. public Map<String, TreeSet<String>> discovery(Set<String> keys) {
  24. return null;
  25. }
  26. @Override
  27. public TreeSet<String> discovery(String key) {
  28. return null;
  29. }
  30. }

执行器注册表线程,具体注册方法,

启动一个ExecutorRegistryThread守护线程,每30秒去注册或更新一次

  1. ExecutorRegistryThread.java
  2. public void start(final String appName, final String address){
  3. // valid
  4. if (appName==null || appName.trim().length()==0) {
  5. logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, appName is null.");
  6. return;
  7. }
  8. if (XxlJobExecutor.getAdminBizList() == null) {
  9. logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null.");
  10. return;
  11. }
  12. registryThread = new Thread(new Runnable() {
  13. @Override
  14. public void run() {
  15. // registry
  16. while (!toStop) {
  17. try {
  18. RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, address);
  19. for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
  20. try {
  21. ReturnT<String> registryResult = adminBiz.registry(registryParam);
  22. if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
  23. registryResult = ReturnT.SUCCESS;
  24. logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
  25. break;
  26. } else {
  27. logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
  28. }
  29. } catch (Exception e) {
  30. logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
  31. }
  32. }
  33. } catch (Exception e) {
  34. if (!toStop) {
  35. logger.error(e.getMessage(), e);
  36. }
  37. }
  38. try {
  39. if (!toStop) {
  40. //30秒注册一次
  41. TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
  42. }
  43. } catch (InterruptedException e) {
  44. if (!toStop) {
  45. logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
  46. }
  47. }
  48. }
  49. // registry remove
  50. try {
  51. RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, address);
  52. for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
  53. try {
  54. ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
  55. if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
  56. registryResult = ReturnT.SUCCESS;
  57. logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
  58. break;
  59. } else {
  60. logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
  61. }
  62. } catch (Exception e) {
  63. if (!toStop) {
  64. logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);
  65. }
  66. }
  67. }
  68. } catch (Exception e) {
  69. if (!toStop) {
  70. logger.error(e.getMessage(), e);
  71. }
  72. }
  73. logger.info(">>>>>>>>>>> xxl-job, executor registry thread destory.");
  74. }
  75. });
  76. registryThread.setDaemon(true);
  77. registryThread.setName("xxl-job, executor ExecutorRegistryThread");
  78. registryThread.start();
  79. }

调度器启动流程

  1. /**
  2. * @author xuxueli 2018-10-28 00:18:17
  3. */
  4. @Component
  5. //xxlJobAdminConfig先被初始化
  6. @DependsOn("xxlJobAdminConfig")
  7. public class XxlJobScheduler implements InitializingBean, DisposableBean {
  8. private static final Logger logger = LoggerFactory.getLogger(XxlJobScheduler.class);
  9. @Override
  10. public void afterPropertiesSet() throws Exception {
  11. // init i18n
  12. initI18n();
  13. /**
  14. * 1.调度任务注册监控助手运行,每30s运行一次
  15. * 主要监听90秒之内没有更新信息的注册机器删除掉
  16. * 然后查询90s以内有更新的机器列表
  17. * 并且把这些机器的最新ip更新到XxlJobGroup表
  18. * 多个地址以逗号分隔
  19. */
  20. JobRegistryMonitorHelper.getInstance().start();
  21. /**
  22. * 2.调度任务失败监控助手运行,每10s运行一次
  23. * 主要负责处理失败的调度任务重新执行,并且更新调度日志
  24. */
  25. JobFailMonitorHelper.getInstance().start();
  26. // 3.初始化Rpc提供者
  27. initRpcProvider();
  28. // 4.执行调度任务
  29. JobScheduleHelper.getInstance().start();
  30. logger.info(">>>>>>>>> init xxl-job admin success.");
  31. }
  32. }

调度任务注册监控助手运行

  1. JobRegistryMonitorHelper.java
  2. public void start(){
  3. registryThread = new Thread(new Runnable() {
  4. @Override
  5. public void run() {
  6. while (!toStop) {
  7. try {
  8. // auto registry group
  9. //执行器地址类型:0=自动注册、1=手动录入 查询所有类型为自动注册的执行器
  10. List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
  11. if (groupList!=null && !groupList.isEmpty()) {
  12. // remove dead address (admin/executor)
  13. //删除无效地址(管理员/执行者)
  14. //删除 90秒之内没有更新信息的注册机器, 90秒没有心跳信息返回,代表机器已经出现问题,故移除
  15. List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT);
  16. if (ids!=null && ids.size()>0) {
  17. XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
  18. }
  19. // 刷新在线地址(管理员/执行者)
  20. HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
  21. // 查询在90秒之内有过更新的机器列表
  22. List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT);
  23. if (list != null) {
  24. //循环注册机器列表, 根据执行器不同,将这些机器列表区分拿出来,然后放到集合里面
  25. for (XxlJobRegistry item: list) {
  26. if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
  27. String appName = item.getRegistryKey();
  28. List<String> registryList = appAddressMap.get(appName);
  29. if (registryList == null) {
  30. registryList = new ArrayList<String>();
  31. }
  32. if (!registryList.contains(item.getRegistryValue())) {
  33. registryList.add(item.getRegistryValue());
  34. }
  35. appAddressMap.put(appName, registryList);
  36. }
  37. }
  38. }
  39. // fresh group address
  40. //遍历执行器列表
  41. for (XxlJobGroup group: groupList) {
  42. // 通过执行器的APP_NAME 拿出他下面的集群机器地址
  43. List<String> registryList = appAddressMap.get(group.getAppName());
  44. String addressListStr = null;
  45. if (registryList!=null && !registryList.isEmpty()) {
  46. Collections.sort(registryList);
  47. addressListStr = "";
  48. for (String item:registryList) {
  49. addressListStr += item + ",";
  50. }
  51. addressListStr = addressListStr.substring(0, addressListStr.length()-1);
  52. }
  53. //设置执行器地址列表,多地址逗号分隔(手动录入)
  54. group.setAddressList(addressListStr);
  55. XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
  56. }
  57. }
  58. } catch (Exception e) {
  59. if (!toStop) {
  60. logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
  61. }
  62. }
  63. try {
  64. //每30s运行一次
  65. TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
  66. } catch (InterruptedException e) {
  67. if (!toStop) {
  68. logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
  69. }
  70. }
  71. }
  72. logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
  73. }
  74. });
  75. //设置为守护线程
  76. registryThread.setDaemon(true);
  77. registryThread.setName("xxl-job, admin JobRegistryMonitorHelper");
  78. //启动线程
  79. registryThread.start();
  80. }

2.调度任务失败监控助手运行

每10s运行一次, 主要负责处理失败的调度任务重新执行,并且更新调度日志

  1. JobFailMonitorHelper.java
  2. public void start(){
  3. monitorThread = new Thread(new Runnable() {
  4. @Override
  5. public void run() {
  6. // 监控
  7. while (!toStop) {
  8. try {
  9. // 从队列中拿出所有失败的 jobLogIds
  10. List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
  11. if (failLogIds!=null && !failLogIds.isEmpty()) {
  12. for (long failLogId: failLogIds) {
  13. // lock log
  14. //alarm_status从0修改为-1(从默认修改为锁定状态) 告警状态:0-默认、-1=锁定状态 1-无需告警、2-告警成功、3-告警失败
  15. int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
  16. //没有更新到就跳过此次循环
  17. if (lockRet < 1) {
  18. continue;
  19. }
  20. XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
  21. XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());
  22. // 1、失败重试监视器的次数如果大于0就继续执行
  23. if (log.getExecutorFailRetryCount() > 0) {
  24. //重新调度任务
  25. JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam());
  26. String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";
  27. log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
  28. //更新XxlJobLog
  29. XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
  30. }
  31. // 2、故障警报监视器
  32. int newAlarmStatus = 0; // 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败
  33. //判断报警邮箱是否为空
  34. if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {
  35. boolean alarmResult = true;
  36. try {
  37. //发送邮件失败警报
  38. alarmResult = failAlarm(info, log);
  39. } catch (Exception e) {
  40. alarmResult = false;
  41. logger.error(e.getMessage(), e);
  42. }
  43. newAlarmStatus = alarmResult?2:3;
  44. } else {
  45. //无需告警
  46. newAlarmStatus = 1;
  47. }
  48. //修改告警状态字段
  49. XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
  50. }
  51. //每10s运行一次
  52. TimeUnit.SECONDS.sleep(10);
  53. } catch (Exception e) {
  54. if (!toStop) {
  55. logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
  56. }
  57. }
  58. }
  59. logger.info(">>>>>>>>>>> xxl-job, job fail monitor thread stop");
  60. }
  61. });
  62. monitorThread.setDaemon(true);
  63. monitorThread.setName("xxl-job, admin JobFailMonitorHelper");
  64. monitorThread.start();
  65. }

4.执行调度任务的具体方法

有2个线程,

1.个是执行任务线程,差不多每5s执行一次

2.执行超过5秒需要执行任务的线程(时间轮执行线程),每1秒执行一次

  1. 超时未调度(超过调度时间5秒)的任务不再执行,修改下次执行时间。
  1. 超过调度时间但未超时(超过5秒之内)的任务,立即放入执行线程池,再修改执行时间,接着判断下次执行时间若在5秒之内,加入timewheel的map后再次修改下次执行时间。
  1. 调度时间在未来5秒之内的(预读5s),根据5秒内即将执行的任务的执行时间的秒数,将其放到ringData的map里面去,key为下次执行的时间戳除以1000然后取模60,value为任务id,并根据表达式修改下次执行时间。

时间轮执行线程定时每1秒执行一次

  1. 删除并取出当前秒数的list和前一秒的list立即放入执行线程池。(往前取1秒防止前1秒的任务未执行,比如当前秒数是59,如果57的任务执行时间大于1秒,可能58的任务就没有被执行过,所以59秒的时候取58和59的任务防止这种情况)。

    JobScheduleHelper.java
    public void start(){

    1. // 工作计划线程
    2. scheduleThread = new Thread(new Runnable() {
    3. @Override
    4. public void run() {
    5. try {
    6. TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
    7. } catch (InterruptedException e) {
    8. if (!scheduleThreadToStop) {
    9. logger.error(e.getMessage(), e);
    10. }
    11. }
    12. logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
    13. //调度线程是否停止
    14. while (!scheduleThreadToStop) {
    15. // Scan Job
    16. long start = System.currentTimeMillis();
    17. Connection conn = null;
    18. Boolean connAutoCommit = null;
    19. PreparedStatement preparedStatement = null;
    20. boolean preReadSuc = true;
    21. try {
    22. conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
    23. connAutoCommit = conn.getAutoCommit();
    24. conn.setAutoCommit(false);
    25. preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
    26. preparedStatement.execute();
    27. // tx start
    28. // 1、pre read
    29. long nowTime = System.currentTimeMillis();
    30. //查询所有下次执行时间小于当前时间+5s的XxlJobInfo集合
    31. List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS);
    32. if (scheduleList!=null && scheduleList.size()>0) {
    33. // 2、push time-ring
    34. for (XxlJobInfo jobInfo: scheduleList) {
    35. // time-ring jump
    36. //当前时间是否大于jobInfo的下一个触发时间+5s
    37. if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
    38. // 2.1、trigger-expire > 5s:pass && make next-trigger-time
    39. // fresh next 通过cron获取下次执行时间
    40. Date nextValidTime = new CronExpression(jobInfo.getJobCron()).getNextValidTimeAfter(new Date());
    41. if (nextValidTime != null) {
    42. //最后触发时间set为下次触发时间
    43. jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
    44. //set下次触发时间
    45. jobInfo.setTriggerNextTime(nextValidTime.getTime());
    46. } else {
    47. jobInfo.setTriggerStatus(0);
    48. jobInfo.setTriggerLastTime(0);
    49. jobInfo.setTriggerNextTime(0);
    50. }
    51. //当前时间是否大于jobInfo的下一个触发时间,此时已经不会超过触发时间5s了
    52. } else if (nowTime > jobInfo.getTriggerNextTime()) {
    53. // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time
    54. CronExpression cronExpression = new CronExpression(jobInfo.getJobCron());
    55. long nextTime = cronExpression.getNextValidTimeAfter(new Date()).getTime();
    56. // 1、触发
    57. JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null);
    58. logger.debug(">>>>>>>>>>> xxl-job, shecule push trigger : jobId = " + jobInfo.getId() );
    59. // 2、fresh next
    60. jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
    61. jobInfo.setTriggerNextTime(nextTime);
  1. // next-trigger-time in 5s, pre-read again
  2. if (jobInfo.getTriggerNextTime() - nowTime < PRE_READ_MS) {
  3. // 1、make ring second
  4. int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
  5. // 2、push time ring
  6. pushTimeRing(ringSecond, jobInfo.getId());
  7. // 3、fresh next
  8. Date nextValidTime = new CronExpression(jobInfo.getJobCron()).getNextValidTimeAfter(new Date(jobInfo.getTriggerNextTime()));
  9. if (nextValidTime != null) {
  10. jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
  11. jobInfo.setTriggerNextTime(nextValidTime.getTime());
  12. } else {
  13. jobInfo.setTriggerStatus(0);
  14. jobInfo.setTriggerLastTime(0);
  15. jobInfo.setTriggerNextTime(0);
  16. }
  17. }
  18. } else {
  19. // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
  20. // 1、make ring second
  21. int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
  22. // 2、push time ring
  23. pushTimeRing(ringSecond, jobInfo.getId());
  24. // 3、fresh next
  25. Date nextValidTime = new CronExpression(jobInfo.getJobCron()).getNextValidTimeAfter(new Date(jobInfo.getTriggerNextTime()));
  26. if (nextValidTime != null) {
  27. jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
  28. jobInfo.setTriggerNextTime(nextValidTime.getTime());
  29. } else {
  30. jobInfo.setTriggerStatus(0);
  31. jobInfo.setTriggerLastTime(0);
  32. jobInfo.setTriggerNextTime(0);
  33. }
  34. }
  35. }
  36. // 3、更新触发完后的XxlJobInfo
  37. for (XxlJobInfo jobInfo: scheduleList) {
  38. XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
  39. }
  40. } else {
  41. preReadSuc = false;
  42. }
  43. // tx stop
  44. } catch (Exception e) {
  45. if (!scheduleThreadToStop) {
  46. logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
  47. }
  48. } finally {
  49. // commit
  50. if (conn != null) {
  51. try {
  52. conn.commit();
  53. } catch (SQLException e) {
  54. if (!scheduleThreadToStop) {
  55. logger.error(e.getMessage(), e);
  56. }
  57. }
  58. try {
  59. conn.setAutoCommit(connAutoCommit);
  60. } catch (SQLException e) {
  61. if (!scheduleThreadToStop) {
  62. logger.error(e.getMessage(), e);
  63. }
  64. }
  65. try {
  66. conn.close();
  67. } catch (SQLException e) {
  68. if (!scheduleThreadToStop) {
  69. logger.error(e.getMessage(), e);
  70. }
  71. }
  72. }
  73. // close PreparedStatement
  74. if (null != preparedStatement) {
  75. try {
  76. preparedStatement.close();
  77. } catch (SQLException ignore) {
  78. if (!scheduleThreadToStop) {
  79. logger.error(ignore.getMessage(), ignore);
  80. }
  81. }
  82. }
  83. }
  84. long cost = System.currentTimeMillis()-start;
  85. // Wait seconds, align second
  86. if (cost < 1000) { // scan-overtime, not wait
  87. try {
  88. // pre-read period: success > scan each second; fail > skip this period;
  89. TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
  90. } catch (InterruptedException e) {
  91. if (!scheduleThreadToStop) {
  92. logger.error(e.getMessage(), e);
  93. }
  94. }
  95. }
  96. }
  97. logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
  98. }
  99. });
  100. scheduleThread.setDaemon(true);
  101. scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
  102. scheduleThread.start();
  103. // ring thread
  104. ringThread = new Thread(new Runnable() {
  105. @Override
  106. public void run() {
  107. // align second
  108. try {
  109. TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 );
  110. } catch (InterruptedException e) {
  111. if (!ringThreadToStop) {
  112. logger.error(e.getMessage(), e);
  113. }
  114. }
  115. while (!ringThreadToStop) {
  116. try {
  117. // second data
  118. List<Integer> ringItemData = new ArrayList<>();
  119. int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
  120. for (int i = 0; i < 2; i++) {
  121. List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
  122. if (tmpData != null) {
  123. ringItemData.addAll(tmpData);
  124. }
  125. }
  126. // ring trigger
  127. logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
  128. if (ringItemData!=null && ringItemData.size()>0) {
  129. // do trigger
  130. for (int jobId: ringItemData) {
  131. // 触发任务
  132. JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);
  133. }
  134. // clear
  135. ringItemData.clear();
  136. }
  137. } catch (Exception e) {
  138. if (!ringThreadToStop) {
  139. logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
  140. }
  141. }
  142. // next second, align second
  143. try {
  144. TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000);
  145. } catch (InterruptedException e) {
  146. if (!ringThreadToStop) {
  147. logger.error(e.getMessage(), e);
  148. }
  149. }
  150. }
  151. logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
  152. }
  153. });
  154. ringThread.setDaemon(true);
  155. ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
  156. ringThread.start();
  157. }

具体的调度方法

  1. JobTriggerPoolHelper.java
  2. /**
  3. * @param jobId
  4. * @param triggerType
  5. * @param failRetryCount
  6. * >=0: use this param
  7. * <0: use param from job info config
  8. * @param executorShardingParam
  9. * @param executorParam
  10. * null: use job param
  11. * not null: cover job param
  12. */
  13. public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam) {
  14. helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam);
  15. }
  16. /**
  17. * 添加调度器
  18. * @param jobId 任务id
  19. * @param triggerType 调度类型
  20. * @param failRetryCount 失败重试次数
  21. * @param executorShardingParam 任务分片参数
  22. * @param executorParam 任务参数
  23. */
  24. public void addTrigger(final int jobId, final TriggerTypeEnum triggerType, final int failRetryCount, final String executorShardingParam, final String executorParam) {
  25. // 选择线程池
  26. ThreadPoolExecutor triggerPool_ = fastTriggerPool;
  27. AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
  28. if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min
  29. triggerPool_ = slowTriggerPool;
  30. }
  31. // 调度
  32. triggerPool_.execute(new Runnable() {
  33. @Override
  34. public void run() {
  35. long start = System.currentTimeMillis();
  36. try {
  37. // do trigger
  38. XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam);
  39. } catch (Exception e) {
  40. logger.error(e.getMessage(), e);
  41. } finally {
  42. // check timeout-count-map
  43. long minTim_now = System.currentTimeMillis()/60000;
  44. if (minTim != minTim_now) {
  45. minTim = minTim_now;
  46. jobTimeoutCountMap.clear();
  47. }
  48. // incr timeout-count-map
  49. long cost = System.currentTimeMillis()-start;
  50. if (cost > 500) { // ob-timeout threshold 500ms
  51. AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
  52. if (timeoutCount != null) {
  53. timeoutCount.incrementAndGet();
  54. }
  55. }
  56. }
  57. }
  58. });
  59. }

判断是不是分片,如果是分片就循环执行器地址列表(系统注册)一个一个服务器去调用

  1. XxlJobTrigger.java
  2. /**
  3. * trigger job
  4. *
  5. * @param jobId 任务id
  6. * @param triggerType 触发类型
  7. * @param failRetryCount 失败重试次数
  8. * >=0: use this param
  9. * <0: use param from job info config
  10. * @param executorShardingParam 任务分片参数
  11. * @param executorParam 任务参数
  12. * null: use job param
  13. * not null: cover job param
  14. */
  15. public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam) {
  16. // 根据id查询XxlJobInfo
  17. XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
  18. if (jobInfo == null) {
  19. logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
  20. return;
  21. }
  22. //设置任务参数
  23. if (executorParam != null) {
  24. jobInfo.setExecutorParam(executorParam);
  25. }
  26. //失败重试次数
  27. int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
  28. //查询执行器
  29. XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());
  30. // 分片参数
  31. int[] shardingParam = null;
  32. if (executorShardingParam!=null){
  33. String[] shardingArr = executorShardingParam.split("/");
  34. if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
  35. shardingParam = new int[2];
  36. shardingParam[0] = Integer.valueOf(shardingArr[0]);
  37. shardingParam[1] = Integer.valueOf(shardingArr[1]);
  38. }
  39. }
  40. //判断执行器路由策略是不是分片广播
  41. if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
  42. && group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
  43. && shardingParam==null) {
  44. //获取多个执行去,循环去执行
  45. for (int i = 0; i < group.getRegistryList().size(); i++) {
  46. processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
  47. }
  48. } else {
  49. if (shardingParam == null) {
  50. shardingParam = new int[]{0, 1};
  51. }
  52. //执行
  53. processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
  54. }
  55. }
  56. XxlJobTrigger.java
  57. /**
  58. * @param group job group, registry list may be empty
  59. * @param jobInfo
  60. * @param finalFailRetryCount
  61. * @param triggerType
  62. * @param index sharding index
  63. * @param total sharding index
  64. */
  65. private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
  66. // 获取阻塞处理策略,默认是串行
  67. ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy
  68. // 获取执行器路由策略,默认是null
  69. ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy
  70. //分片参数
  71. String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;
  72. // 1、保存XxlJobLog日志
  73. XxlJobLog jobLog = new XxlJobLog();
  74. jobLog.setJobGroup(jobInfo.getJobGroup());
  75. jobLog.setJobId(jobInfo.getId());
  76. jobLog.setTriggerTime(new Date());
  77. XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
  78. logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
  79. // 2、初始化触发器参数
  80. TriggerParam triggerParam = new TriggerParam();
  81. //任务id
  82. triggerParam.setJobId(jobInfo.getId());
  83. //执行器,任务Handler名称
  84. triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
  85. //执行器,任务参数
  86. triggerParam.setExecutorParams(jobInfo.getExecutorParam());
  87. // 阻塞处理策略
  88. triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
  89. // 任务执行超时时间,单位秒
  90. triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
  91. //日志id
  92. triggerParam.setLogId(jobLog.getId());
  93. //日志保存时间
  94. triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime());
  95. // GLUE类型 #com.xxl.job.core.glue.GlueTypeEnum
  96. triggerParam.setGlueType(jobInfo.getGlueType());
  97. // GLUE源代码
  98. triggerParam.setGlueSource(jobInfo.getGlueSource());
  99. // GLUE更新时间
  100. triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
  101. //设置执行器列表的索引
  102. triggerParam.setBroadcastIndex(index);
  103. //设置执行器列表的总数
  104. triggerParam.setBroadcastTotal(total);
  105. // 3、初始化地址
  106. String address = null;
  107. ReturnT<String> routeAddressResult = null;
  108. //判断执行器地址列表是否为空
  109. if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
  110. //判断是否分片策略
  111. if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
  112. if (index < group.getRegistryList().size()) {
  113. //获取执行器地址
  114. address = group.getRegistryList().get(index);
  115. } else {
  116. address = group.getRegistryList().get(0);
  117. }
  118. } else {
  119. routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
  120. if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
  121. address = routeAddressResult.getContent();
  122. }
  123. }
  124. } else {
  125. routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
  126. }
  127. // 4、触发远程执行器
  128. ReturnT<String> triggerResult = null;
  129. if (address != null) {
  130. //执行
  131. triggerResult = runExecutor(triggerParam, address);
  132. } else {
  133. triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
  134. }
  135. // 5、收集触发器信息
  136. StringBuffer triggerMsgSb = new StringBuffer();
  137. triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());
  138. triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
  139. triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
  140. .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
  141. triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
  142. triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
  143. if (shardingParam != null) {
  144. triggerMsgSb.append("("+shardingParam+")");
  145. }
  146. triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
  147. triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
  148. triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);
  149. triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>")
  150. .append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");
  151. // 6、保存日志触发器信息
  152. jobLog.setExecutorAddress(address);
  153. //执行器任务Handler名称
  154. jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
  155. //执行器任务参数
  156. jobLog.setExecutorParam(jobInfo.getExecutorParam());
  157. //分片参数
  158. jobLog.setExecutorShardingParam(shardingParam);
  159. //最终失败重试计数
  160. jobLog.setExecutorFailRetryCount(finalFailRetryCount);
  161. //jobLog.setTriggerTime();
  162. //触发器结果的状态码
  163. jobLog.setTriggerCode(triggerResult.getCode());
  164. //触发器信息
  165. jobLog.setTriggerMsg(triggerMsgSb.toString());
  166. XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);
  167. logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
  168. }
  169. XxlJobTrigger.java
  170. /**
  171. * 运行执行器
  172. * @param triggerParam
  173. * @param address
  174. * @return
  175. */
  176. public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
  177. ReturnT<String> runResult = null;
  178. try {
  179. ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
  180. runResult = executorBiz.run(triggerParam);
  181. } catch (Exception e) {
  182. logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
  183. runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
  184. }
  185. StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
  186. runResultSB.append("<br>address:").append(address);
  187. runResultSB.append("<br>code:").append(runResult.getCode());
  188. runResultSB.append("<br>msg:").append(runResult.getMsg());
  189. runResult.setMsg(runResultSB.toString());
  190. return runResult;
  191. }

1.先加载线程中的执行器,然后和缓存的执行器做比较,如果不一样就以缓存的执行器为主,缓存的执行器就是 执行器服务初始化的时候放入的

2.然后判断运行模式,校验执行器,执行器阻止策略

3.然后把TriggerParam数据推送到triggerQueue队列

并且通过工作线程jobThread实时去监控队列,取出来执行

4.如果jobThread工作线程为空就去创建一个工作线程,并且缓存到jobThreadRepository的map里面,key为调度任务的id并启动工作线程

  1. ExecutorBizImpl.java
  2. @Override
  3. public ReturnT<String> run(TriggerParam triggerParam) {
  4. // load old:jobHandler + jobThread
  5. JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
  6. IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
  7. String removeOldReason = null;
  8. // valid:jobHandler + jobThread
  9. GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
  10. //判断运行模式是不是BEAN
  11. if (GlueTypeEnum.BEAN == glueTypeEnum) {
  12. // 去缓冲中根据执行器名字获取一个实例
  13. IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
  14. // 校验执行器
  15. if (jobThread!=null && jobHandler != newJobHandler) {
  16. // 如果执行器不一致,就把旧的jobThread和jobHandler置空
  17. removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
  18. jobThread = null;
  19. jobHandler = null;
  20. }
  21. // 校验执行器
  22. if (jobHandler == null) {
  23. jobHandler = newJobHandler;
  24. if (jobHandler == null) {
  25. return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
  26. }
  27. }
  28. } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
  29. // valid old jobThread
  30. if (jobThread != null &&
  31. !(jobThread.getHandler() instanceof GlueJobHandler
  32. && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
  33. // change handler or gluesource updated, need kill old thread
  34. removeOldReason = "change job source or glue type, and terminate the old job thread.";
  35. jobThread = null;
  36. jobHandler = null;
  37. }
  38. // valid handler
  39. if (jobHandler == null) {
  40. try {
  41. IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
  42. jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
  43. } catch (Exception e) {
  44. logger.error(e.getMessage(), e);
  45. return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
  46. }
  47. }
  48. } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {
  49. // valid old jobThread
  50. if (jobThread != null &&
  51. !(jobThread.getHandler() instanceof ScriptJobHandler
  52. && ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
  53. // change script or gluesource updated, need kill old thread
  54. removeOldReason = "change job source or glue type, and terminate the old job thread.";
  55. jobThread = null;
  56. jobHandler = null;
  57. }
  58. // valid handler
  59. if (jobHandler == null) {
  60. jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
  61. }
  62. } else {
  63. return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
  64. }
  65. // 执行器阻止策略
  66. if (jobThread != null) {
  67. ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
  68. if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
  69. // discard when running
  70. if (jobThread.isRunningOrHasQueue()) {
  71. return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
  72. }
  73. } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
  74. // kill running jobThread
  75. if (jobThread.isRunningOrHasQueue()) {
  76. removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
  77. jobThread = null;
  78. }
  79. } else {
  80. // 串行执行
  81. }
  82. }
  83. // replace thread (new or exists invalid)
  84. //替换掉工作线程
  85. if (jobThread == null) {
  86. //如果为空就注册一个工作线程
  87. jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
  88. }
  89. // 数据推送到队列
  90. ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
  91. return pushResult;
  92. }

工作线程的run方法

执行调度任务的工作线程

实时去取triggerQueue队列里面的调度任务,然后执行

循环30次并且队列没数据就清空一次jobThreadRepository

  1. JobThread.java
  2. @Override
  3. public void run() {
  4. // init
  5. try {
  6. handler.init();
  7. } catch (Throwable e) {
  8. logger.error(e.getMessage(), e);
  9. }
  10. // execute
  11. while(!toStop){
  12. running = false;
  13. //空闲时间
  14. idleTimes++;
  15. TriggerParam triggerParam = null;
  16. ReturnT<String> executeResult = null;
  17. try {
  18. // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
  19. //获取队列里的任务,设置3秒钟超时
  20. triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
  21. if (triggerParam!=null) {
  22. running = true;
  23. idleTimes = 0;
  24. triggerLogIdSet.remove(triggerParam.getLogId());
  25. // 日志路径名称, 比如 "logPath/yyyy-MM-dd/9999.log"
  26. String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTim()), triggerParam.getLogId());
  27. XxlJobFileAppender.contextHolder.set(logFileName);
  28. //设置分片索引和总数
  29. ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()));
  30. XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + triggerParam.getExecutorParams());
  31. //判断是否调度超时
  32. if (triggerParam.getExecutorTimeout() > 0) {
  33. // limit timeout
  34. Thread futureThread = null;
  35. try {
  36. final TriggerParam triggerParamTmp = triggerParam;
  37. FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(new Callable<ReturnT<String>>() {
  38. @Override
  39. public ReturnT<String> call() throws Exception {
  40. return handler.execute(triggerParamTmp.getExecutorParams());
  41. }
  42. });
  43. futureThread = new Thread(futureTask);
  44. futureThread.start();
  45. //调度结果
  46. executeResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
  47. } catch (TimeoutException e) {
  48. XxlJobLogger.log("<br>----------- xxl-job job execute timeout");
  49. XxlJobLogger.log(e);
  50. executeResult = new ReturnT<String>(IJobHandler.FAIL_TIMEOUT.getCode(), "job execute timeout ");
  51. } finally {
  52. futureThread.interrupt();
  53. }
  54. } else {
  55. // 仅仅执行任务
  56. executeResult = handler.execute(triggerParam.getExecutorParams());
  57. }
  58. if (executeResult == null) {
  59. executeResult = IJobHandler.FAIL;
  60. } else {
  61. executeResult.setMsg(
  62. (executeResult!=null&&executeResult.getMsg()!=null&&executeResult.getMsg().length()>50000)
  63. ?executeResult.getMsg().substring(0, 50000).concat("...")
  64. :executeResult.getMsg());
  65. executeResult.setContent(null); // limit obj size
  66. }
  67. XxlJobLogger.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + executeResult);
  68. } else {
  69. //超过一定次数,清空线程,并设置JobThread的stop停止标识位,终止轮询。也就是30秒空轮询
  70. if (idleTimes > 30) {
  71. if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost
  72. XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
  73. }
  74. }
  75. }
  76. } catch (Throwable e) {
  77. if (toStop) {
  78. XxlJobLogger.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
  79. }
  80. StringWriter stringWriter = new StringWriter();
  81. e.printStackTrace(new PrintWriter(stringWriter));
  82. String errorMsg = stringWriter.toString();
  83. executeResult = new ReturnT<String>(ReturnT.FAIL_CODE, errorMsg);
  84. XxlJobLogger.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
  85. } finally {
  86. if(triggerParam != null) {
  87. // callback handler info
  88. if (!toStop) {
  89. // commonm
  90. TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), executeResult));
  91. } else {
  92. // is killed
  93. ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job running,killed]");
  94. TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), stopResult));
  95. }
  96. }
  97. }
  98. }
  99. // callback trigger request in queue
  100. while(triggerQueue !=null && triggerQueue.size()>0){
  101. TriggerParam triggerParam = triggerQueue.poll();
  102. if (triggerParam!=null) {
  103. // is killed
  104. ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job not executed, in the job queue, killed.]");
  105. TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTim(), stopResult));
  106. }
  107. }
  108. // 销毁
  109. try {
  110. handler.destroy();
  111. } catch (Throwable e) {
  112. logger.error(e.getMessage(), e);
  113. }
  114. logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
  115. }

发表评论

表情:
评论列表 (有 0 条评论,155人围观)

还没有评论,来说两句吧...

相关阅读

    相关 webpack

    webpack 源码解析 序言 项目上在使用webpack,感叹真是神器,既然是神器,就想探知究竟。 总览 webpack整体是一个插件架构,所有的功能都以

    相关 HashMap

    来不及整理电子版,先献丑把笔记本拍几张,随后整理。 有人问,什么年代了,还手写笔记,哈哈,如果不亲自手写一遍,我是真心记不住。很多API不用知道工作原理 一样可以使用,所以

    相关 HashMap

    源码博客相关博客写了那么多,突然想起来都没有写一篇我们日常开发中最常见的HashMap,今天简单补一下! HashMap简介: `HashMap` 是应用更加广泛的哈希

    相关 hashMap

    源码来自jdk:1.8,和其他jdk版本可能有少许差异。 一.hashMap的实现原理     hashMap底层是一个有Node组成的数组,每个Node都有一个key

    相关 HashMap

    本文的所有图片以及源码解析内容都来自于微信公众号<java知音>,原文作者:阿进的写字台。此处仅是对该公众号分享的内容进行一下消化吸收,不作传播。想要阅读原文,可以关注这个公众