ElasticJob3.0整合SpringBoot,ElasticJob-Lite【ElasticJob入门篇】

素颜马尾好姑娘i 2022-10-11 06:11 334阅读 0赞

文章目录

    • 一、前言
      • 1-1、什么是ElasticJob
      • 1-2、其它
    • 二、使用
      • 2-1、作业
        • 2-1-1、普通作业
        • 2-1-2、数据流作业
        • 2-1-3、脚本作业
        • 2-1-4、HTTP作业(3.0.0-beta 提供)
      • 2-2、作业调度(基于SpringBoot)
        • 2-2-1、导入pom文件
        • 2-2-2、配置文件
        • 2-2-3、分片数量
        • 2-2-4、手动调用
      • 2-3、配置错误处理策略
        • 2-3-1、引入pom
        • 2-3-2、修改配置文件
        • 2-3-3、使用
        • 2-3-4、其它
      • 2-4、作业监听器
    • 三、运维平台配置
      • 3-1、安装部署
        • 3-1-1、下载
        • 3-1-2、启动
        • 3-1-3、使用
    • 四、视频地址
    • 五、源代码获取

一、前言

1-1、什么是ElasticJob

我们之前在SpringBoot项目里面使用定时任务,先是开启定时任务 @EnableScheduling,然后使用 @Scheduled(cron = "*/1 * * * * ?"),这样使用起来很简单,也没有什么问题。

但思考一下这样的场景,如果一个服务已经满足不了我们的需求,这时候应该怎么办呢?我们很容易想到集群,部署多份。

一般我们的服务部署多份只需要前面加一个负载的功能就行,毕竟每个服务提供的是一样的服务。但是定时任务则不行,比如我们A服务的fun方法运行到一半,然后B服务的fun开始运行,这样肯定会出现问题的。

而我们的ElasticJob则可以很好的去解决这个问题,并且可以很好的支持水平扩容/缩容。当然了它还附带了其它的功能比如好看好用的操作界面、异常通知(邮件、企业微信、钉钉)等。

1-2、其它

ElasticJob的使用是依赖zookeeper的(多个服务之间肯定是要通信才可以做到上面的功能),这个就自己去安装一下了,如果只是测试可以安装一个windows版本很简单。

ElasticJob里面提供两个模块,ElasticJob-Lite、ElasticJob-Cloud,我们下面只介绍ElasticJob-Lite的使用。

官方文档

二、使用

如果修改yml文件不生效,可有如下两个办法

  • 改一下zookeeper的名命空间
  • 配置文件job下面添加 overwrite: true

2-1、作业

作业其实就是定时任务,每一个作业就是一个定时任务。

作业的种类有多种简单作业数据流作业脚本作业HTTP作业(3.0.0-beta 提供)

2-1-1、普通作业

普通作业只需要实现SimpleJob接口,然后重写execute方法(也是用的最多的作业)

  1. import org.apache.shardingsphere.elasticjob.api.ShardingContext;
  2. import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
  3. import org.springframework.stereotype.Component;
  4. @Component
  5. public class MyElasticJob implements SimpleJob {
  6. @Override
  7. public void execute(ShardingContext context) {
  8. System.out.println(context.getShardingTotalCount() + " " + context.getShardingItem());
  9. }
  10. }

2-1-2、数据流作业

MyDataflowJob

  1. import com.xdx97.elasticjob.bean.XdxBean;
  2. import org.apache.shardingsphere.elasticjob.api.ShardingContext;
  3. import org.apache.shardingsphere.elasticjob.dataflow.job.DataflowJob;
  4. import org.springframework.stereotype.Component;
  5. import java.util.ArrayList;
  6. import java.util.List;
  7. @Component
  8. public class MyDataflowJob implements DataflowJob<XdxBean> {
  9. @Override
  10. public List<XdxBean> fetchData(ShardingContext shardingContext) {
  11. List<XdxBean> foos = new ArrayList<>();
  12. double random = Math.random();
  13. System.out.println("fetchData------ " + random);
  14. if (random > 0.5){
  15. XdxBean foo = new XdxBean();
  16. foo.setName("小道仙");
  17. foos.add(foo);
  18. }
  19. return foos;
  20. }
  21. @Override
  22. public void processData(ShardingContext shardingContext, List<XdxBean> list) {
  23. System.out.println("来了processData------");
  24. }
  25. }

XdxBean

  1. public class XdxBean {
  2. private String name;
  3. public String getName() {
  4. return name;
  5. }
  6. public void setName(String name) {
  7. this.name = name;
  8. }
  9. }

运行结果:

  1. fetchData------ 0.13745888666984807
  2. fetchData------ 0.2922741337641118
  3. fetchData------ 0.7834818165147507
  4. 来了processData------
  5. fetchData------ 0.8177868853353837
  6. 来了processData------
  7. fetchData------ 0.14076346085285385

注:Math.random() 产生的数据在0-1之间。

从上面运行的结果,我们可以得出结论,所谓的数据流作业其实也是一个定时任务,只不过当这个定时任务产生数据的时候,就会携带数据去调用processData()方法

2-1-3、脚本作业

感觉用的不多和时间关系就不研究了

2-1-4、HTTP作业(3.0.0-beta 提供)

感觉用的不多和时间关系就不研究了

2-2、作业调度(基于SpringBoot)

ElasticJob提供了三种使用方法,基于Java基于SpringBoot基于配置文件。三种都介绍也很麻烦,原理都一样,我这里就只介绍SpringBoot的。

2-2-1、导入pom文件

  1. <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter -->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter</artifactId>
  5. <version>2.2.0.RELEASE</version>
  6. </dependency>
  7. <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
  8. <dependency>
  9. <groupId>org.springframework.boot</groupId>
  10. <artifactId>spring-boot-starter-web</artifactId>
  11. <version>2.2.0.RELEASE</version>
  12. </dependency>
  13. <dependency>
  14. <groupId>org.apache.shardingsphere.elasticjob</groupId>
  15. <artifactId>elasticjob-lite-spring-boot-starter</artifactId>
  16. <version>3.0.0-RC1</version>
  17. </dependency>

这里有个坑,官方文档里面只说我们有和SpringBoot整合,但是没有提供完整的pom文件,导致卡了我好久。

这个starter里面有数据库相关的连接,我们只是简单测试,不想配置数据源的话,改一下启动类注解即可

  1. @SpringBootApplication(exclude = { DataSourceAutoConfiguration.class })

2-2-2、配置文件

  1. server:
  2. port: 8085
  3. elasticjob:
  4. regCenter:
  5. #zookeeper 的ip:port
  6. serverLists: 127.0.0.1:2181
  7. #名命空间,自己定义就好了
  8. namespace: my-job4
  9. jobs:
  10. #你的这个定时任务名称,自定义名称
  11. myElasticJob:
  12. #定时任务的全路径名
  13. elasticJobClass: com.elastic.job.MyElasticJob
  14. #定时任务执行的cron表达式
  15. cron: 0/5 * * * * ?
  16. #分片数量
  17. shardingTotalCount: 10

2-2-3、分片数量

它是水平扩展的核心,比如上面定义了10个片(对应的片名是0-9),假设我们的定时任务是每1分钟执行一次,定时方法是execute。

当我们只有一台服务器的时候,那么每1分钟会调用十次execute(每次调用的时候分片名(0-9)都不一样)。

当我们有两台服务器的时候,那么每1分钟A、B服务器各自调用五次execute(每次调用的时候分片名(A0-4,B5-9)都不一样)

基于上面这个说明,我们可以以此类推,当有三台服务器的时候A(3个),B(3个),C(4个),这样水平扩展就很容易了。

基于上面的分片功能,我们的作业也需要修改一下

  1. public void execute(ShardingContext context) {
  2. System.out.println(context.getShardingTotalCount() + " " + context.getShardingItem());
  3. switch (context.getShardingItem()) {
  4. case 0:
  5. // do something by sharding item 0
  6. break;
  7. case 1:
  8. // do something by sharding item 1
  9. break;
  10. case 2:
  11. // do something by sharding item 2
  12. break;
  13. // case n: ...
  14. }
  15. }

假如你把下面的测试代码复制五份,然后一次启动(稍等间隔一会),你会发现每个服务器所分得得片区在变化。依次关闭服务也会发现这样的规律。

2-2-4、手动调用

比如我们需要临时去调用一下这个定时任务,但是它得间隔时间是几个小时亦或者是更久。

官方给的操作如下,但是我尝试了很久依旧没有解决错误,所以我按照基于Java的调用方式实现了一下
在这里插入图片描述

实现代码如下:

  1. import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
  2. import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
  3. import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
  4. import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
  5. import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
  6. import org.springframework.web.bind.annotation.GetMapping;
  7. import org.springframework.web.bind.annotation.RestController;
  8. @RestController
  9. public class OneOffJobController {
  10. @GetMapping("/execute")
  11. public String executeOneOffJob() {
  12. OneOffJobBootstrap jobBootstrap = new OneOffJobBootstrap(createRegistryCenter(), new MyElasticJob(), createJobConfiguration());
  13. // 可多次调用一次性调度
  14. jobBootstrap.execute();
  15. return "success";
  16. }
  17. private static CoordinatorRegistryCenter createRegistryCenter() {
  18. CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("127.0.0.1:2181", "my-job4"));
  19. regCenter.init();
  20. return regCenter;
  21. }
  22. private static JobConfiguration createJobConfiguration() {
  23. // 创建作业配置
  24. return JobConfiguration.newBuilder("myElasticJob", 10).cron("").build();
  25. }
  26. }

其实上面这个可以做成通用,参数都是用外部传参的方式。至于job对象,我们可以传递一个字符串然后通过反射来创建一个对象。

  1. Class classType = Class.forName("com.elastic.job.MyElasticJob");
  2. ElasticJob obj = (ElasticJob)classType.newInstance();

等后面配置了运维界面,发现不需要如此麻烦,直接在运维界面点一下即可(不需要你提供接口)

2-3、配置错误处理策略

就是当我们定时任务执行处了异常,要做些什么处理。官方提供了下面的6种策略,我这里就演示一个邮件通知策略,有兴趣或有需求的可以去研究一下别的策略。























































错误处理策略名称 说明 是否内置 是否默认 是否需要额外配置
记录日志策略 记录作业异常日志,但不中断作业执行
抛出异常策略 抛出系统异常并中断作业执行
忽略异常策略 忽略系统异常且不中断作业执行
邮件通知策略 发送邮件消息通知,但不中断作业执行
企业微信通知策略 发送企业微信消息通知,但不中断作业执行
钉钉通知策略 发送钉钉消息通知,但不中断作业执行

2-3-1、引入pom

  1. <dependency>
  2. <groupId>org.apache.shardingsphere.elasticjob</groupId>
  3. <artifactId>elasticjob-error-handler-email</artifactId>
  4. <version>3.0.0-RC1</version>
  5. </dependency>

2-3-2、修改配置文件

  1. server:
  2. port: 8085
  3. elasticjob:
  4. regCenter:
  5. #zookeeper 的ip:port
  6. serverLists: 127.0.0.1:2181
  7. #名命空间,自己定义就好了
  8. namespace: my-job5
  9. jobs:
  10. #你的这个定时任务名称,自定义名称
  11. oneSimpleJob:
  12. #定时任务的全路径名
  13. elasticJobClass: com.xdx97.elasticjob.job.OneSimpleJob
  14. #定时任务执行的cron表达式
  15. cron: 0/30 * * * * ?
  16. #分片数量
  17. shardingTotalCount: 1
  18. jobErrorHandlerType: EMAIL
  19. overwrite: true
  20. props:
  21. email:
  22. #邮件服务器地址
  23. host: smtp.126.com
  24. #邮件服务器端口
  25. port: 465
  26. #邮件服务器用户名
  27. username: xxxxxxx
  28. #邮件服务器密码
  29. password: xxxxx
  30. #是否启用 SSL 加密传输
  31. useSsl: true
  32. #邮件主题
  33. subject: ElasticJob error message
  34. #发送方邮箱地址
  35. from: xxxxx@126.com
  36. #接收方邮箱地址
  37. to: xxx@qq.com
  38. #抄送邮箱地址
  39. cc: xxxxxx
  40. #密送邮箱地址
  41. bcc: xxxxx
  42. # 是否开启调试模式
  43. debug: false

2-3-3、使用

我们只需要在上面的定时任务里面抛出一个异常,就会收到邮件了。

2-3-4、其它

这里面有一个坑,看官方文档里面我们以为这个propsjobs是一个等级的其实不是

在这里插入图片描述

如果你要配置多个定时任务,并且每个定时任务的通知对象都不一样,可以把上面配置多份,毕竟每个props属于每个定时任务。

2-4、作业监听器

ElasticJob-Lite 提供作业监听器,用于在任务执行前和执行后执行监听的方法。 监听器分为每台作业节点均执行的常规监听器和分布式场景中仅单一节点执行的分布式监听器。 本章节将详细介绍他们的使用方式。

在作业依赖(DAG)功能开发完成之后,可能会考虑删除作业监听器功能。

作业监听器文档这块并未提供springboot的方式,我自己尝试了许久也不成功,提了issue暂时没回复,就先这样吧。

三、运维平台配置

如果说上面的分布式分片定时任务让我觉得它牛逼,那么这个运维平台就真的让我眼前一亮了。
在这里插入图片描述

3-1、安装部署

这个ui界面是单独的界面,它和我们具体的服务没有任何关联。所以完全不需要修改之前的任何代码,我们只需要下载它,并且启动它就好了。

3-1-1、下载

https://www.apache.org/dyn/closer.cgi/shardingsphere/elasticjob-ui-3.0.0-RC1/apache-shardingsphere-elasticjob-3.0.0-RC1-lite-ui-bin.tar.gz

3-1-2、启动

这里说一下,如果你直接在window下解压启动会报一个错(找不到启动类),这个是因为解压的问题。 (压缩好的包,我也放在下面源代码里面了)

我的解决办法是把压缩包上传到linux上,然后解压后再拉下来。

  1. tar zxvf apache-shardingsphere-elasticjob-3.0.0-RC1-lite-ui-bin.tar

3-1-3、使用

浏览器打开 http://localhost:8088/,用户名/密码:root/root

然后去全局配置>注册中心配置,配置上你的zookeeper地址即可

具体操作都很简单,只是点击按钮就不说了

四、视频地址

https://www.bilibili.com/video/BV19L411p7qR

五、源代码获取

关注下面微信公众号,回复关键字获取:elasticJobLiteDemo

在这里插入图片描述

发表评论

表情:
评论列表 (有 0 条评论,334人围观)

还没有评论,来说两句吧...

相关阅读

    相关 elasticjob详解

    我这篇博文主要是讲如何从源码上玩转一个框架,而不是简单的使用,简单的使用看看别的帖子即可,我这篇主要阐述深度的技术 问题 1 GitHub官方网址          [h

    相关 springboot整合elasticjob

    任务调度来源 考虑下面的几种场景: 每天凌晨1点,需要对系统的订单表数据根据SKU的维度进行汇总并生成报表 每隔半个小时,需要将数据库中的那些超时未支付的订