分布式任务调度平台XXL-JOB
#
目录
前言
一、系统组成
二、架构图
三、调度模块
3.1 调度中心HA(集群)
3.2 调度线程池
3.3 并行调度
3.4 过期处理策略
3.5 执行器
四、任务详解
五、xxl-jbo开发
maven 配置
xxl-job 配置
BEAN模式(类形式)
步骤一:执行器项目中,开发Job类:
BEAN模式(方法形式)
步骤一:执行器项目中,开发Job方法:
分片广播任务
总结
前言
XXL-JOB是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。
一、系统组成
- 调度模块(调度中心):
负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。调度系统与任务解耦,提高了系统可用性和稳定性,同时调度系统性能不再受限于任务模块;
支持可视化、简单且动态的管理调度信息,包括任务新建,更新,删除,GLUE开发和任务报警等,所有上述操作都会实时生效,同时支持监控调度结果以及执行日志,支持执行器Failover。 - 执行模块(执行器):
负责接收调度请求并执行任务逻辑。任务模块专注于任务的执行等操作,开发和维护更加简单和高效;
接收“调度中心”的执行请求、终止请求和日志请求等。
二、架构图
三、调度模块
3.1 调度中心HA(集群)
基于数据库的集群方案,数据库选用Mysql;集群分布式并发环境中进行定时任务调度时,会在各个节点上报任务,存到数据库中,执行时会从数据库中取出触发器来执行,如果触发器的名称和执行时间相同,则只有一个节点去执行此任务。
3.2 调度线程池
调度采用线程池方式实现,避免单线程因阻塞而引起任务调度延迟。
3.3 并行调度
XXL-JOB调度模块默认采用并行机制,在多线程调度的情况下,调度模块被阻塞的几率很低,大大提高了调度系统的承载量。
XXL-JOB的不同任务之间并行调度、并行执行。
XXL-JOB的单个任务,针对多个执行器是并行运行的,针对单个执行器是串行执行的。同时支持任务终止。
3.4 过期处理策略
任务调度错过触发时间时的处理策略:
- 可能原因:服务重启;调度线程被阻塞,线程被耗尽;上次调度持续阻塞,下次调度被错过;
处理策略:
- 过期超5s:本次忽略,当前时间开始计算下次触发时间
- 过期5s内:立即触发一次,当前时间开始计算下次触发时间``
3.5 执行器
执行器实际上是一个内嵌的Server,默认端口9999(配置项:xxl.job.executor.port)。
在项目启动时,执行器会通过“@JobHandler”识别Spring容器中“Bean模式任务”,以注解的value属性为key管理起来。
“执行器”接收到“调度中心”的调度请求时,如果任务类型为“Bean模式”,将会匹配Spring容器中的“Bean模式任务”,然后调用其execute方法,执行任务逻辑。如果任务类型为“GLUE模式”,将会加载GLue代码,实例化Java对象,注入依赖的Spring服务(注意:Glue代码中注入的Spring服务,必须存在与该“执行器”项目的Spring容器中),然后调用execute方法,执行任务逻辑。
四、任务详解
路由策略:当执行器集群部署时,提供丰富的路由策略,包括;
FIRST(第一个):固定选择第一个机器;
LAST(最后一个):固定选择最后一个机器;
ROUND(轮询):;
RANDOM(随机):随机选择在线的机器;
CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。
LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举;
LEAST_RECENTLY_USED(最近最久未使用):最久未使用的机器优先被选举;
FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;
BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;
SHARDING_BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;
子任务:每个任务都拥有一个唯一的任务ID(任务ID可以从任务列表获取),当本任务执行结束并且执行成功时,将会触发子任务ID所对应的任务的一次主动调度。
调度过期策略:
- 忽略:调度过期后,忽略过期的任务,从当前时间开始重新计算下次触发时间;
- 立即执行一次:调度过期后,立即执行一次,并从当前时间开始重新计算下次触发时间;
阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;
单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行;
丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;
覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;
任务超时时间:支持自定义任务超时时间,任务运行超时将会主动中断任务;
失败重试次数;支持自定义任务失败重试次数,当任务失败时将会按照预设的失败重试次数主动进行重试;
五、xxl-jbo开发
maven 配置
<!--xxl-job-->
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.2.0</version>
</dependency>
xxl-job 配置
/**
* xxl-job 配置
*
* @author yangyanping
* @date 2023-05-30
*/
@Slf4j
@Configuration
public class XxlJobConfig {
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.executor.appname}")
private String appName;
@Value("${xxl.job.executor.port}")
private int port;
@Value("${xxl.job.accessToken}")
private String accessToken;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appName);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
return xxlJobSpringExecutor;
}
}
BEAN模式(类形式)
Bean模式任务,支持基于类的开发方式,每个任务对应一个Java类。
- 优点:不限制项目环境,兼容性好。即使是无框架项目,如main方法直接启动的项目也可以提供支持,可以参考示例项目 “xxl-job-executor-sample-frameless”;
缺点:
- 每个任务需要占用一个Java类,造成类的浪费;
- 不支持自动扫描任务并注入到执行器容器,需要手动注入。
步骤一:执行器项目中,开发Job类:
开发步骤:
1、继承”IJobHandler”:“com.xxl.job.core.handler.IJobHandler”;
2、注册到Spring容器:添加“@Component”注解,被Spring容器扫描为Bean实例;
3、注册到执行器工厂:添加“@JobHandler(value=”自定义jobhandler名称”)”注解,注解value值对应的是调度中心新建任务JobHandler属性的值。
4、执行日志:需要通过 “XxlJobLogger.log” 打印执行日志;
@Component
@JobHandler(value="demoJobHandler")
public class DemoJobHandler extends IJobHandler {
@Override
public ReturnT<String> execute(String param) throws Exception {
XxlJobLogger.log("XXL-JOB, Hello World.");
for (int i = 0; i < 5; i++) {
XxlJobLogger.log("beat at:" + i);
TimeUnit.SECONDS.sleep(2);
}
return ReturnT.SUCCESS;
}
}
BEAN模式(方法形式)
Bean模式任务,支持基于方法的开发方式,每个任务对应一个方法。
优点:
- 每个任务只需要开发一个方法,并添加”@XxlJob”注解即可,更加方便、快速。
- 支持自动扫描任务并注入到执行器容器。
- 缺点:要求Spring容器环境;
基于方法开发的任务,底层会生成JobHandler代理,和基于类的方式一样,任务也会以JobHandler的形式存在于执行器任务容器中。
步骤一:执行器项目中,开发Job方法:
1、任务开发:在Spring Bean实例中,开发Job方法;
2、注解配置:为Job方法添加注解 "@XxlJob(value="自定义jobhandler名称", init = "JobHandler初始化方法", destroy = "JobHandler销毁方法")",注解value值对应的是调度中心新建任务的JobHandler属性的值。
3、执行日志:需要通过 "XxlJobHelper.log" 打印执行日志;
4、任务结果:默认任务结果为 "成功" 状态,不需要主动设置;如有诉求,比如设置任务结果为失败,可以通过 "XxlJobHelper.handleFail/handleSuccess" 自主设置任务结果;
@Component
public class DemoJobHandler {
@Override
@XxlJob("demoJobHandler")
public ReturnT<String> execute(String param) throws Exception {
XxlJobLogger.log("XXL-JOB, Hello World.");
for (int i = 0; i < 5; i++) {
XxlJobLogger.log("beat at:" + i);
TimeUnit.SECONDS.sleep(2);
}
return ReturnT.SUCCESS;
}
}
分片广播任务
/**
* 分片广播任务
*/
@Service
public class ShardingJobHandler {
@Override
@XxlJob("shardingJobHandler")
public ReturnT<String> execute(String param) throws Exception {
// 分片参数
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
int shardIndex = shardingVO.getIndex();
int shardTotal = shardingVO.getTotal();
log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal());
// 业务逻辑
List<Task> tasks = buildTasks(param);
for (int k = 0; k < tasks.size(); k++) {
int mod = k % shardTotal;
if (shardIndex != mod) {
continue;
}
final Task task = tasks.get(k);
threadPool.execute(() -> {
executeTask(task);
});
}
return ReturnT.SUCCESS;
}
}
总结
对于并发场景不是特别高的系统来说,xxl-job配置部署简单易用,不需要引入多余的组件,同时提供了可视化的控制台,使用起来非常友好,是一个比较好的选择。
XXL开源社区 | 首页
还没有评论,来说两句吧...