elasticjob接入方式和管理端使用

Dear 丶 2022-06-06 21:12 470阅读 0赞

1.elasticjob接入方式

1.1 加入依赖

  1. <dependency>
  2. <groupId>com.dangdang</groupId>
  3. <artifactId>elastic-job-lite-core</artifactId>
  4. <version>2.1.5</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>com.dangdang</groupId>
  8. <artifactId>elastic-job-lite-spring</artifactId>
  9. <version>2.1.5</version>
  10. </dependency>

1.2 实现任务逻辑

1.2.1 spring的方式

通过实现SimpleJob接口,最终会定时调用execute方法

  1. package com.company.job;
  2. /** * quartz-memory-project Created by caowenyi on 2017/9/27 . */
  3. @Slf4j
  4. public class SampleJob implements SimpleJob {
  5. @Override
  6. public void execute(ShardingContext shardingContext) {
  7. Stopwatch stopwatch = Stopwatch.createStarted();
  8. log.info("开始Sample任务的运行");
  9. String jobParameter = shardingContext.getJobParameter();
  10. long callCostTime = stopwatch.elapsed(TimeUnit.SECONDS);
  11. log.info("结束Sample任务的运行 任务参数={} 花费时间={}秒", jobParameter, callCostTime);
  12. }
  13. }

配置任务的zookeeper注册中心:applicationContext-zk.xml

参数说明:
- zk.hosts: 具体环境对应的zookeeper服务器地址。

  • zk.namespace:命名空间,用于在管理端区分不同的也业务方的不同环境。建议命令方式:业务方_具体环境。 如:crm_bi_task
  • base-sleep-time-milliseconds、max-sleep-time-milliseconds、max-retries:设置client与zookeeper连接丢失时,进行重连的策略。
    ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs) 。时间间隔 = baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)))

    <?xml version=”1.0” encoding=”UTF-8”?>




配置任务的调度信息:applicationContext-job.xml

参数说明:
- id:bean的名字
- registry-center-ref:注册中心bean
- cron: 定时调度的cron表达式
- sharding-total-count:任务切片数量。一般只需要设置为1
- sharding-item-parameters:任务分片参数。使用0=A即可。
- description:任务的用途描述
- event-trace-rdb-data-source:数据源bean对象,将任务的执行记录放到数据库中。如果不需要看任务的执行历史结果可以不用设置该值。注意:线上环境需要先建表,因为job应用一般没有建表的权限,测试环境一般可以自动建表。
- class:任务执行类

  1. <beans xmlns="http://www.springframework.org/schema/beans"
  2. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xmlns:job="http://www.xxxx.com/schema/ddframe/job"
  4. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.xxxx.com/schema/ddframe/job http://www.xxxx.com/schema/ddframe/job/job.xsd">
  5. <job:simple id="sampleJob" registry-center-ref="regCenter" cron="0 30 * * * ?"
  6. sharding-total-count="1" sharding-item-parameters="0=A" description="demo任务"
  7. event-trace-rdb-data-source="dataSource" class="com.company.job.SampleJob"/>
  8. </beans>

最后将两个job加入到spring的beanfactory.

spring工程加入方式

  1. <context-param>
  2. <param-name>contextConfigLocation</param-name>
  3. <param-value>
  4. classpath*:applicationContext-job.xml
  5. classpath*:applicationContext-zk.xml
  6. </param-value>
  7. </context-param>

springboot工程加入方式

  1. package com.company.job;
  2. import org.springframework.context.annotation.Configuration;
  3. import org.springframework.context.annotation.ImportResource;
  4. /** * Created by caowenyi on 2017/7/28. */
  5. @Configuration
  6. @ImportResource(locations = {
  7. "classpath:applicationContext-zk.xml", "classpath:applicationContext-job.xml"})
  8. public class JobConfig {
  9. }

ps:后续开发新的job只需要实现SimpleJob接口、将job的配置信息加入xx-job.xml中

2 非spring方式接入

  1. package com.dangdang.job;
  2. import com.alibaba.druid.pool.DruidDataSource;
  3. import com.dangdang.ddframe.job.config.JobCoreConfiguration;
  4. import com.dangdang.ddframe.job.config.dataflow.DataflowJobConfiguration;
  5. import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
  6. import com.dangdang.ddframe.job.event.JobEventConfiguration;
  7. import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration;
  8. import com.dangdang.ddframe.job.lite.api.JobScheduler;
  9. import com.dangdang.ddframe.job.lite.api.listener.ElasticJobListener;
  10. import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
  11. import com.dangdang.ddframe.job.reg.base.CoordinatorRegistryCenter;
  12. import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
  13. import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
  14. import lombok.extern.slf4j.Slf4j;
  15. /** * Created by caowenyi on 2017/7/11. */
  16. @Slf4j
  17. public class Main {
  18. private static final String zkHosts = "192.168.5.59:2181,192.168.5.61:2181,192.168.5.66:2181";
  19. //private static final String zkHosts = "192.168.16.146:2181,192.168.16.147:2181,192.168.16.148:2181";
  20. private static final String zkNameSpace = "zk-elastic-job";
  21. public static void main(String[] args) {
  22. log.info("Main starting ..........................");
  23. SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(
  24. JobCoreConfiguration.newBuilder("simpleElasticJob", "* 0/30 * * * ?", 3)
  25. .shardingItemParameters("0=A,1=B,2=C").failover(true).build(),
  26. "com.dangdang.job.MySimpleJob");
  27. LiteJobConfiguration simpleJobRootConfiguration =
  28. LiteJobConfiguration.newBuilder(simpleJobConfiguration).build();
  29. DataflowJobConfiguration dataflowJobConfiguration = new
  30. DataflowJobConfiguration(JobCoreConfiguration.newBuilder("dataFlowElasticJob", "0/10 * * * * ?",
  31. 3) .shardingItemParameters("0=A,1=B,2=C").build(), "com.dangdang.job.MyDataFlowJob", false);
  32. LiteJobConfiguration dataflowJobRootConfiguration =
  33. LiteJobConfiguration.newBuilder(dataflowJobConfiguration).build();
  34. ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(zkHosts, zkNameSpace);
  35. zookeeperConfiguration.setBaseSleepTimeMilliseconds(1000);
  36. zookeeperConfiguration.setMaxSleepTimeMilliseconds(3000);
  37. zookeeperConfiguration.setMaxRetries(3);
  38. CoordinatorRegistryCenter coordinatorRegistryCenter =
  39. new ZookeeperRegistryCenter(zookeeperConfiguration);
  40. coordinatorRegistryCenter.init();
  41. DruidDataSource dataSource = new DruidDataSource();
  42. dataSource.setName("localdb");
  43. dataSource.setUrl(
  44. "jdbc:mysql://localhost:3306/elastic_job?autoReconnect=true&useUnicode=true&characterEncoding=UTF-8");
  45. dataSource.setUsername("cweeyii");
  46. dataSource.setPassword("cweeyii");
  47. dataSource.setDriverClassName("com.mysql.jdbc.Driver");
  48. JobEventConfiguration jobEventConfiguration = new JobEventRdbConfiguration(dataSource);
  49. ElasticJobListener jobListener = new MyJobAllShardListener();
  50. new JobScheduler(coordinatorRegistryCenter, simpleJobRootConfiguration, jobEventConfiguration,
  51. jobListener).init();
  52. new JobScheduler(coordinatorRegistryCenter, dataflowJobRootConfiguration, jobEventConfiguration).init();
  53. log.info("Main finished ..........................");
  54. }
  55. }

流程上总体分为三步:

  1. 初始化配置中心(zookeeper)
  2. 配置任务执行信息并注册到zookeeper
  3. 初始化quartz调度主线程进行调度

3.elasticjob管理端的使用

管理端环境地址:http://xxxxx:8899/ 用户名:root 密码:root

只用于查看任务执行情况,没有操作权限的用户和密码:guest guest

线上环境:http://xxxxxx:8899/ root权限用户和密码,找悦同申请

guest用户还是可用

3.1 管理端的配置

  • 首先可以在右上角点击switch language选择为中文

image

  • 添加namespace配置

    1. 命名空间:配置文件中的zk.namespace
    2. 注册中心名称:最好和配置文件中的命名空间一致
    3. 注册中心地址:配置文件中的zk.hosts
    4. 登录凭证:zookeeper是否设置登录凭证(一般都没有设置)
      image
  • 配置完成点击提交,点击连接,变为已连接状态
  • 作业操作选项下,进行作业操作
    image
  1. 1. 修改:支持修改调度任务的调度信息、传入参数等
  2. ![image][image 3]
  3. 大部分情况下只需要修改:cron表达式和自定义参数
  4. 2. 服务器维度
  5. ![image][image 4]
  6. 主要提供了:禁止某个服务器不能进行任务的调度,已经服务器状态信息等
  7. 3. 添加数据追踪源设置(用于获取数据库中记录的任务执行历史状态)

需要配置了event-trace-rdb-data-source属性

  1. <job:simple id="sampleJob" registry-center-ref="regCenter" cron="0 30 * * * ?"
  2. sharding-total-count="1" sharding-item-parameters="0=A" description="demo任务"
  3. event-trace-rdb-data-source="dataSource" class="com.company.job.SampleJob"/>

image

  1. 查看任务执行历史轨迹

可以查看任务的执行情况,并支持过滤搜索,支持时间排序等
image

  1. 查看历史执行状态(不常用)

主要是查看任务执行的历史运行状态

  1. 任务下线

    • 首先需要将任务从job服务器中删除(只需要删除xml中任务的配置即可)
    • 再从elasticjob管理端对应的namespace中删除已经下线的任务
      image

4.开发的注意事项

  1. 进行任务类名的重命名,需要在xml中新建新的任务id,因为类名是不可更改的
  2. 进行管理端的任务调度时,需要先连接到具体namespace

发表评论

表情:
评论列表 (有 0 条评论,470人围观)

还没有评论,来说两句吧...

相关阅读

    相关 elasticjob详解

    我这篇博文主要是讲如何从源码上玩转一个框架,而不是简单的使用,简单的使用看看别的帖子即可,我这篇主要阐述深度的技术 问题 1 GitHub官方网址          [h

    相关 springboot整合elasticjob

    任务调度来源 考虑下面的几种场景: 每天凌晨1点,需要对系统的订单表数据根据SKU的维度进行汇总并生成报表 每隔半个小时,需要将数据库中的那些超时未支付的订

    相关 美团CAT客户接入方式

    接着CAT服务端安装部署之后,下一步就是考虑客户端应用如何接入CAT的问题了。在这里我们以一个最简单的demo应用(hello world类型)来讲解接入步骤: 很重要的步