springBoot中使用elasticjob

刺骨的言语ヽ痛彻心扉 2023-01-18 04:07 291阅读 0赞

elasticjob

执行体中最重要的两个参数是;分片总数,分片项。
分片总数:集群的节点总数(类似于银行办理业务窗口的总柜台数)
分片项:把每个排序的话,默认从0开始,最大值不会超过分片总数,业务受理时会拿到的业务编号(类似于你去银行柜台办理业务,你拿到的排队编号)

假设我们规定排队总长度只有10个,但是启动的服务节点只有三个,那么elasticjob会给每个节点根据zookeeper实际节点数取余N%3;
那么三个节点可能受理的编号情况是,
节点1包含0,1,2
节点2包含3,4,5
节点3包含6,7,8,9

这样每个节点只取它自己受理部分的编号,就不会出现重复消费和遗漏的问题了。

比如总分片数为4,两个节点,每五秒执行一次,一分钟后在启动第二个节点
他们的执行打印情况是

节点1:

  1. 分片项 ShardingItem: 0 | 运行时间: 14:18:40 | 线程ID: 58 | 分片参数: C
  2. 分片项 ShardingItem: 1 | 运行时间: 14:18:40 | 线程ID: 59 | 分片参数: D
  3. 分片项 ShardingItem: 2 | 运行时间: 14:18:40 | 线程ID: 60 | 分片参数: null
  4. 分片项 ShardingItem: 3 | 运行时间: 14:18:40 | 线程ID: 61 | 分片参数: null
  5. 分片项 ShardingItem: 0 | 运行时间: 14:20:50 | 线程ID: 72 | 分片参数: C
  6. 分片项 ShardingItem: 1 | 运行时间: 14:20:50 | 线程ID: 73 | 分片参数: D
  7. 分片项 ShardingItem: 0 | 运行时间: 14:20:55 | 线程ID: 60 | 分片参数: C

节点2

  1. 分片项 ShardingItem: 2 | 运行时间: 14:56:10 | 线程ID: 66 | 分片参数: null
  2. 分片项 ShardingItem: 3 | 运行时间: 14:56:10 | 线程ID: 67 | 分片参数: null
  3. 分片项 ShardingItem: 3 | 运行时间: 14:56:15 | 线程ID: 69 | 分片参数: null
  4. 分片项 ShardingItem: 2 | 运行时间: 14:56:15 | 线程ID: 68 | 分片参数: null

可以看出节点1一开始是全部负责,后面把2,3分给了节点2处理,这样两个节点各处理两个

总数固定的情况下,随着集群数量的增加,每个节点拥有的分片项也会有变化

  1. @Component
  2. public class MySimpleJob implements SimpleJob {
  3. //@Autowired
  4. //private OrderService orderService;//业务实现接口
  5. @Override
  6. public void execute(ShardingContext shardingContext) {
  7. log.info("我是分片项:"+shardingContext.getShardingItem()+",总分片数是:"+
  8. shardingContext.getShardingTotalCount());
  9. }
  10. }

springBoot中使用

生成一个springboot项目
https://start.spring.io/

pom

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <parent>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-parent</artifactId>
  8. <version>2.4.5</version>
  9. <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.bamboo</groupId> <artifactId>elastic-job-demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>elastic-job-demo</name> <description>Demo project for Spring Boot</description>
  10. <properties>
  11. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  12. <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  13. <java.version>1.8</java.version>
  14. </properties>
  15. <dependencies>
  16. <dependency>
  17. <groupId>org.springframework.boot</groupId>
  18. <artifactId>spring-boot-starter</artifactId>
  19. </dependency>
  20. <dependency>
  21. <groupId>org.springframework.boot</groupId>
  22. <artifactId>spring-boot-starter-web</artifactId>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.springframework.boot</groupId>
  26. <artifactId>spring-boot-starter-test</artifactId>
  27. <scope>test</scope>
  28. </dependency>
  29. <dependency>
  30. <groupId>com.dangdang</groupId>
  31. <artifactId>elastic-job-lite-spring</artifactId>
  32. <version>2.1.5</version>
  33. </dependency>
  34. <dependency>
  35. <artifactId>elastic-job-lite-core</artifactId>
  36. <groupId>com.dangdang</groupId>
  37. <version>2.1.5</version>
  38. </dependency>
  39. </dependencies>
  40. <build>
  41. <plugins>
  42. <plugin>
  43. <groupId>org.springframework.boot</groupId>
  44. <artifactId>spring-boot-maven-plugin</artifactId>
  45. </plugin>
  46. </plugins>
  47. </build>
  48. </project>

application.properties

  1. server.port= 8082
  2. # zookeeper config
  3. regCenter.serverList=localhost:2181
  4. regCenter.namespace=demo009
  5. # 每隔20秒执行
  6. mailSendJob.cron=0/5 * * * * ?
  7. # 总分片数
  8. shardingCategory.shardingTotalCount=10
  9. shardingCategory.shardingItemParameters=0=A,1=B,2=C,3=D,4=E,5=F,6=G,7=H,8=I,9=J
  10. simpleJob.cron= 0/5 * * * * ?
  11. simpleJob.shardingTotalCount= 4
  12. simpleJob.shardingItemParameters= 0=A,1=B,0=C,1=D

job

zookeeper配置信息初始化

  1. package com.bamboo.elasticjobdemo.config;
  2. import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration;
  3. import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
  4. import org.springframework.beans.factory.annotation.Value;
  5. import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. @Configuration
  9. @ConditionalOnExpression("'${regCenter.serverList}'.length() > 0")
  10. public class ZookeeperRegistryCenterConfig {
  11. @Bean(initMethod = "init")
  12. public ZookeeperRegistryCenter regCenter(@Value("${regCenter.serverList}") final String serverList,
  13. @Value("${regCenter.namespace}") final String namespace) {
  14. return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
  15. }
  16. }

job配置初始化

  1. package com.bamboo.elasticjobdemo.config;
  2. import com.bamboo.elasticjobdemo.job.MySimpleJob;
  3. import com.dangdang.ddframe.job.api.simple.SimpleJob;
  4. import com.dangdang.ddframe.job.config.JobCoreConfiguration;
  5. import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
  6. import com.dangdang.ddframe.job.lite.api.JobScheduler;
  7. import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
  8. import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
  9. import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
  10. import org.springframework.beans.factory.annotation.Value;
  11. import org.springframework.context.annotation.Bean;
  12. import org.springframework.context.annotation.Configuration;
  13. import javax.annotation.Resource;
  14. @Configuration
  15. public class MySimpleJobConfig {
  16. @Resource
  17. private ZookeeperRegistryCenter regCenter;
  18. /** * 自己实现的job * */
  19. @Bean
  20. public SimpleJob simpleJob() {
  21. return new MySimpleJob();
  22. }
  23. /** * 将自己实现的job加入调度中执行 * */
  24. @Bean(initMethod = "init")
  25. public JobScheduler simpleJobScheduler(final SimpleJob simpleJob,
  26. @Value("${simpleJob.cron}") final String cron,
  27. @Value("${simpleJob.shardingTotalCount}") final int shardingTotalCount,
  28. @Value("${simpleJob.shardingItemParameters}") final String shardingItemParameters) {
  29. return new SpringJobScheduler(simpleJob, regCenter,
  30. getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters));
  31. }
  32. /** * 作业的配置 * */
  33. private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron,
  34. final int shardingTotalCount, final String shardingItemParameters) {
  35. return LiteJobConfiguration.newBuilder(
  36. new SimpleJobConfiguration(
  37. JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount)
  38. .shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName())).overwrite(true).build();
  39. }
  40. }

具体的job

  1. package com.bamboo.elasticjobdemo.job;
  2. import com.dangdang.ddframe.job.api.ShardingContext;
  3. import com.dangdang.ddframe.job.api.simple.SimpleJob;
  4. import java.text.SimpleDateFormat;
  5. import java.util.Date;
  6. public class MySimpleJob implements SimpleJob {
  7. @Override
  8. public void execute(ShardingContext context) {
  9. String str = String.format("分片总数%S\t分片项%S\t分片参数%S\tjob名称%S\t",context.getShardingTotalCount(),context.getShardingItem(),
  10. context.getShardingParameter(),context.getJobName()
  11. );
  12. System.out.println(str);
  13. switch (context.getShardingItem()){
  14. case 0:
  15. System.out.println(context.getShardingItem()+"参数:"+context.getShardingParameter());
  16. break;
  17. case 1:
  18. System.out.println(context.getShardingItem()+"参数:"+context.getShardingParameter());
  19. break;
  20. case 2:
  21. System.out.println(context.getShardingItem()+"参数:"+context.getShardingParameter());
  22. break;
  23. case 3:
  24. System.out.println(context.getShardingItem()+"参数:"+context.getShardingParameter());
  25. break;
  26. default:
  27. System.out.println("defalt参数:"+context.getShardingParameter());
  28. break;
  29. }
  30. System.out.println(
  31. String.format("分片项 ShardingItem: %s | 运行时间: %s | 线程ID: %s | 分片参数: %s ",
  32. context.getShardingItem(),
  33. new SimpleDateFormat("HH:mm:ss").format(new Date()),
  34. Thread.currentThread().getId(),
  35. context.getShardingParameter())
  36. );
  37. }
  38. }

启动application,一分钟后修改端口号在启动一个实例
这个时候你观察两个控制台打印的信息,就会和上面讲述的一致

实际业务逻辑处理

在实际业务处理中待处理的数据比如状态为待支付=0并且支付时间pay_time<now()的数据要根据节点取余动态的分给不同的节点处理,而不能像之前的定时任务去掉%部分的筛选条件,会造成多个节点多次同时处理的情况

  1. select * from t_order where pay_time < #{now} and status =0 and id% #{TotalCount} =#{ShardingItem}

因为同一个节点拥有的多个分片项就会有几个线程同时执行,为了每个节点的各个分配项执行线程任务时不冲突,必须加上具体的分片项值

这样业务部分就可以写出如下方式

  1. @Override
  2. public void execute(ShardingContext shardingContext) {
  3. Date now = New Date();
  4. List<Order> orderList = orderService.getExpireOrder(now,shardingContext.getShardingTotalCount(),shardingContext.getShardingItem());
  5. if(orders !=null && orders.size()>0){
  6. List<Long> orderIds = orderList.stream().map(Order::getId).collect(Collectors.toList())
  7. orderService.cancelOrder(orderIds,updateTime,status,updateUser,updateNow);
  8. }
  9. }

警告

elasticjob已经5年没有更新了,能不用建议尽量不要用

如果一定要用elasticjob,建议单独创建一个子项目,否则当大量依赖jar时,会出现版本冲突,特别是新项目依赖新版本的spring cloud等其他基础组件时,你可能无法解决版本问题,以至于项目无法启动。

参考

【30分钟未支付订单自动取消】SpringBoot整合Elastic-Job分布式定时任务框架
https://blog.csdn.net/qq\_32370913/article/details/106572649
任务调度 ElasticJob 视频教程全集
https://www.bilibili.com/video/BV1Dt411g7E6?p=2

发表评论

表情:
评论列表 (有 0 条评论,291人围观)

还没有评论,来说两句吧...

相关阅读

    相关 elasticjob详解

    我这篇博文主要是讲如何从源码上玩转一个框架,而不是简单的使用,简单的使用看看别的帖子即可,我这篇主要阐述深度的技术 问题 1 GitHub官方网址          [h

    相关 springboot整合elasticjob

    任务调度来源 考虑下面的几种场景: 每天凌晨1点,需要对系统的订单表数据根据SKU的维度进行汇总并生成报表 每隔半个小时,需要将数据库中的那些超时未支付的订