ElasticJob简单使用

Bertha 。 2022-10-09 00:53 377阅读 0赞

ElasticJob单点使用

任务类

  1. public class BackupJob implements SimpleJob {
  2. public void execute(ShardingContext shardingContext) {
  3. String selectSql = "select * from resume where state='未归档' limit 1";
  4. List<Map<String, Object>> list =
  5. JdbcUtil.executeQuery(selectSql);
  6. if(list == null || list.size() == 0) {
  7. return;
  8. }
  9. Map<String, Object> stringObjectMap = list.get(0);
  10. long id = (long) stringObjectMap.get("id");
  11. String name = (String) stringObjectMap.get("name");
  12. String education = (String)
  13. stringObjectMap.get("education");
  14. // 打印出这条记录
  15. System.out.println("======>>>id:" + id + " name:" +
  16. name + " education:" + education);
  17. // 更改状态
  18. String updateSql = "update resume set state='已归档' where id=?";
  19. JdbcUtil.executeUpdate(updateSql,id);
  20. // 归档这条记录
  21. String insertSql = "insert into resume_bak select * from resume where id=?";
  22. JdbcUtil.executeUpdate(insertSql,id);
  23. }
  24. }

主要的任务就是将未归档的数据整理到归档的表中,表结构一样 执行类

  1. public class JobMain {
  2. public static void main(String[] args) {
  3. //初始化注册中心
  4. ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("127.0.0.1:2181","data-job");
  5. CoordinatorRegistryCenter coordinatorRegistryCenter= new ZookeeperRegistryCenter(zookeeperConfiguration);
  6. coordinatorRegistryCenter.init();
  7. //创建任务
  8. JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder("data-job","*/2 * * * * ?",1).build();
  9. SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration,BackupJob.class.getName());
  10. //执行任务
  11. new JobScheduler(coordinatorRegistryCenter, LiteJobConfiguration.newBuilder(simpleJobConfiguration).build()).init();
  12. }
  13. }

这种情况下,启动两个任务类只会有一个在执行任务。但是当一个任务停止之后,另一个任务会立马开始接着执行任务,相当于其他中间件中的主备切换。但是这里的主备切换是依托zk进行的

多节点分布式任务调度

修改执行类的代码为

  1. public class JobMain {
  2. public static void main(String[] args) {
  3. //初始化注册中心
  4. ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("127.0.0.1:2181","data-job");
  5. CoordinatorRegistryCenter coordinatorRegistryCenter= new ZookeeperRegistryCenter(zookeeperConfiguration);
  6. coordinatorRegistryCenter.init();
  7. //创建任务
  8. JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder("data-job","*/2 * * * * ?",3).build();
  9. SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration,BackupJob.class.getName());
  10. //执行任务
  11. new JobScheduler(coordinatorRegistryCenter, LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true).build()).init();
  12. }
  13. }

除了修改分片数还需要在执行任务的类中执行相应的分片参数,另外需要注意的是仅仅增加分票策略是不生效的,必须同时配置分片参数。另外如果使用同一个job来做执行的话。需要增加overwrite为true 执行器代码为 ``` public class BackupJob implements SimpleJob { public void execute(ShardingContext shardingContext) { int shardingitem = shardingContext.getShardingItem(); System.out.println(“当前分片”+shardingitem); String shardingParamter = shardingContext.getShardingParameter(); System.out.println(shardingParamter); String selectSql = “select * from resume where state=’未归档’ and name=’”+shardingParamter+”‘ limit 1”; List> list = JdbcUtil.executeQuery(selectSql); if(list == null || list.size() == 0) { return; } Map stringObjectMap = list.get(0); long id = (long) stringObjectMap.get(“id”); String name = (String) stringObjectMap.get(“name”); String education = (String) stringObjectMap.get(“education”); // 打印出这条记录 System.out.println(“======>>>id:” + id + “ name:” + name + “ education:” + education); // 更改状态 String updateSql = “update resume set state=’已归档’ where id=?”; JdbcUtil.executeUpdate(updateSql,id); // 归档这条记录 String insertSql = “insert into resume_bak select * from resume where id=?”; JdbcUtil.executeUpdate(insertSql,id); }

} ``` 测试结果为,当执行器未全部启动时,所有分片在一个执行器上运行。当三个执行器都启动时,会平均分配到三个执行器。

demo代码地址为https://github.com/zhendiao/deme-code/tree/main/schedule_job

欢迎搜索关注本人的公众号【微瞰技术】,以及总结的分类面试题https://github.com/zhendiao/JavaInterview

file

发表评论

表情:
评论列表 (有 0 条评论,377人围观)

还没有评论,来说两句吧...

相关阅读

    相关 elasticjob详解

    我这篇博文主要是讲如何从源码上玩转一个框架,而不是简单的使用,简单的使用看看别的帖子即可,我这篇主要阐述深度的技术 问题 1 GitHub官方网址          [h

    相关 springboot整合elasticjob

    任务调度来源 考虑下面的几种场景: 每天凌晨1点,需要对系统的订单表数据根据SKU的维度进行汇总并生成报表 每隔半个小时,需要将数据库中的那些超时未支付的订