SpringBoot-SpringBatch 比眉伴天荒 2022-03-25 16:19 263阅读 0赞 ## *SpringBatch简介* ## SpringBatch是一个轻量级的综合性批处理框架,可用于开发企业信息系统中那些至关重要的数据批量处理业务. Spring Batch基于POJO和Spring框架,相当容易上手使用,让开发者很容易地访问和利用企业级服务.Spring Batch不是调度(scheduling)框架.因为已经有很多非常好的企业级调度框架,包括商业性质的和开源的,例如Quartz, Tivoli, Control-M等.它是为了与调度程序一起协作完成任务而设计的,而不是用来取代调度框架的. SpringBatch提供了大量的,可重用的功能,这些功能对大数据处理来说是必不可少的,包括 日志/跟踪(tracing),事务管理,任务处理(processing)统计,任务重启, 忽略(skip),和资源管理等功能。 此外还提供了许多高级服务和特性,使之能够通过优化(optimization ) 和分片技术(partitioningtechniques)来高效地执行超大型数据集的批处理任务。 SpringBatch是一个具有高可扩展性的框架,简单的批处理,或者复杂的大数据批处理作业都可以通过Spring Batch框架来实现。 ## SpringBoot整合SpringBatch ## * pom文件的springbatch依赖,同时加如mysql的依赖: <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> * 在resource文件夹中新建一个csv文件,用来保存所需要进行批处理的数据: ![11636334-a753d438a043eba7][] 这里写图片描述 * 在数据库里新建一张person表: CREATE TABLE person( id int PRIMARY KEY AUTO_INCREMENT, name VARCHAR(20), age int, nation VARCHAR(20), address VARCHAR(20) ); * application.yml文件指定数据库: spring: datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://127.0.0.1:3306/basedb username: root password: root * 新建Person实体类,这里不说了,直接说springbatch配置类: @Configuration @EnableBatchProcessing public class CsvBatchConfig { /** * ItemReader定义,用来读取数据 * 1,使用FlatFileItemReader读取文件 * 2,使用FlatFileItemReader的setResource方法设置csv文件的路径 * 3,对此对cvs文件的数据和领域模型类做对应映射 * @return * @throws Exception */ @Bean public ItemReader<Person> reader()throws Exception { FlatFileItemReader<Person> reader = new FlatFileItemReader<>(); reader.setResource(new ClassPathResource("person.csv")); reader.setLineMapper(new DefaultLineMapper<Person>(){ { setLineTokenizer(new DelimitedLineTokenizer(){ { setNames(new String[]{"name","age","nation","address"}); } }); setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>(){ { setTargetType(Person.class); }}); } }); return reader; } /** * ItemProcessor定义,用来处理数据 * @return */ @Bean public ItemProcessor<Person,Person> processor(){ //使用我们自定义的ItemProcessor的实现CsvItemProcessor CsvItemProcessor processor = new CsvItemProcessor(); //为processor指定校验器为CsvBeanValidator() processor.setValidator(csvBeanValidator()); return processor; } /** * ItemWriter定义,用来输出数据 * spring能让容器中已有的Bean以参数的形式注入,Spring Boot已经为我们定义了dataSource * @param dataSource * @return */ @Bean public ItemWriter<Person> writer(@Qualifier("dataSource") DataSource dataSource){ JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<>(); //我们使用JDBC批处理的JdbcBatchItemWriter来写数据到数据库 writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()); String sql = "insert into person "+" (name,age,nation,address) " +" values(:name,:age,:nation,:address)"; //在此设置要执行批处理的SQL语句 writer.setSql(sql); writer.setDataSource(dataSource); return writer; } /** * JobRepository,用来注册Job的容器 * jobRepositor的定义需要dataSource和transactionManager,Spring Boot已为我们自动配置了 * 这两个类,Spring可通过方法注入已有的Bean * @param dataSource * @param transactionManager * @return * @throws Exception */ @Bean public JobRepository jobRepository(@Qualifier("dataSource") DataSource dataSource, PlatformTransactionManager transactionManager)throws Exception{ JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean(); jobRepositoryFactoryBean.setDataSource(dataSource); jobRepositoryFactoryBean.setTransactionManager(transactionManager); jobRepositoryFactoryBean.setDatabaseType(DatabaseType.MYSQL.name()); return jobRepositoryFactoryBean.getObject(); } /** * JobLauncher定义,用来启动Job的接口 * @param dataSource * @param transactionManager * @return * @throws Exception */ @Bean public SimpleJobLauncher jobLauncher(@Qualifier("dataSource") DataSource dataSource, PlatformTransactionManager transactionManager)throws Exception{ SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager)); return jobLauncher; } /** * Job定义,我们要实际执行的任务,包含一个或多个Step * @param jobBuilderFactory * @param s1 * @return */ @Bean public Job importJob(JobBuilderFactory jobBuilderFactory, Step s1){ return jobBuilderFactory.get("importJob") .incrementer(new RunIdIncrementer()) .flow(s1)//为Job指定Step .end() .listener(csvJobListener())//绑定监听器csvJobListener .build(); } /** *step步骤,包含ItemReader,ItemProcessor和ItemWriter * @param stepBuilderFactory * @param reader * @param writer * @param processor * @return */ @Bean public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Person> reader, ItemWriter<Person> writer, ItemProcessor<Person,Person> processor){ return stepBuilderFactory .get("step1") .<Person,Person>chunk(65000)//批处理每次提交65000条数据 .reader(reader)//给step绑定reader .processor(processor)//给step绑定processor .writer(writer)//给step绑定writer .build(); } @Bean public CsvJobListener csvJobListener(){ return new CsvJobListener(); } @Bean public Validator<Person> csvBeanValidator(){ return new CsvBeanValidator<Person>(); } } * 自定义一个校验器: public class CsvItemProcessor extends ValidatingItemProcessor<Person> { @Override public Person process(Person item) throws ValidationException { /** * 需要执行super.process(item)才会调用自定义校验器 */ super.process(item); /** * 对数据进行简单的处理,若民族为汉族,则数据转换为01,其余转换为02 */ if (item.getNation().equals("汉族")) { item.setNation("01"); } else { item.setNation("02"); } return item; } } * 还有数据校验类: public class CsvBeanValidator<T> implements Validator<T>,InitializingBean{ private javax.validation.Validator validator; @Override public void validate(T value) throws ValidationException { /** * 使用Validator的validate方法校验数据 */ Set<ConstraintViolation<T>> constraintViolations = validator.validate(value); if (constraintViolations.size() > 0) { StringBuilder message = new StringBuilder(); for (ConstraintViolation<T> constraintViolation : constraintViolations) { message.append(constraintViolation.getMessage() + "\n"); } throw new ValidationException(message.toString()); } } /** * 使用JSR-303的Validator来校验我们的数据,在此进行JSR-303的Validator的初始化 * @throws Exception */ @Override public void afterPropertiesSet() throws Exception { ValidatorFactory validatorFactory = Validation.buildDefaultValidatorFactory(); validator = validatorFactory.usingContext().getValidator(); } } [11636334-a753d438a043eba7]: /images/20220325/90c9368e39e942938a5e87f816278a3e.png
还没有评论,来说两句吧...