Spring Batch Read from DB and Write to File

- 日理万妓 2022-08-07 04:41 334阅读 0赞
  1. Spring BatchSpringSourceAccenture联合开发的,主要解决轻量级的大数据批处理。下在附上一个架构图,方便理解。

Center

  1. 显然,在我们的开发过程中,我们主要观注ItemReader, ItemProcessor, ItemWriter, 数据的读写包括文件与数据库, 对于中间的环节ItemProcessor,也是至关重要的,也就是说:读来的数据的处理逻辑就在此,处理了之后再进入写的环节。当然我们可以重写ItemReader, ItemProcessor, ItemWriter.
  2. 好了,下面进入Spring Batch的示例环节。
  1. Spring的相关配置文件:DB相关的属性文件database.properties,DB相关的配置文件applicationDatabase.xml(包括ProxoolDataSource, DataSourceTransactionManager), Spring Batch的基本配置文件applicationContext.xml, 业务相关的配置文件migrationSimpleJob.xml

database.properties

  1. jdbc.connection.driverClassName=oracle.jdbc.driver.OracleDriver
  2. jdbc.connection.url=jdbc:oracle:thin:<username>/<password>@100.111.86.250:1521:<SID>
  3. jdbc.connection.username=<username>
  4. jdbc.connection.password=<password>
  5. proxool.houseKeepingTestSql=SELECT CURRENT_DATE
  6. proxool.prototypeCount=0
  7. proxool.hourseKeepingSleepTime=30000
  8. proxool.maximumActiveTime=300000
  9. proxool.maximumConnectionLifetime=14400000
  10. proxool.minimumConnectionCount=5
  11. proxool.maximumConnectionCount=15
  12. proxool.statistics=15s,10m,1d
  13. proxool.alias=pool_dbname
  14. proxool.simultaneous-build-throttle=50

applicationDatabase.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:batch="http://www.springframework.org/schema/batch"
  3. xmlns:p="http://www.springframework.org/schema/p"
  4. xsi:schemaLocation="
  5. http://www.springframework.org/schema/beans
  6. http://www.springframework.org/schema/beans/spring-beans.xsd
  7. http://www.springframework.org/schema/context
  8. http://www.springframework.org/schema/context/spring-context.xsd
  9. http://www.springframework.org/schema/aop
  10. http://www.springframework.org/schema/aop/spring-aop.xsd
  11. http://www.springframework.org/schema/mvc
  12. http://www.springframework.org/schema/mvc/spring-mvc.xsd
  13. http://www.springframework.org/schema/tx
  14. http://www.springframework.org/schema/tx/spring-tx.xsd
  15. http://www.springframework.org/schema/batch
  16. http://www.springframework.org/schema/batch/spring-batch.xsd">
  17. <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
  18. <property name="locations">
  19. <list>
  20. <value>conf/database.properties</value>
  21. </list>
  22. </property>
  23. </bean>
  24. <bean id="dataSource" class="org.logicalcobwebs.proxool.ProxoolDataSource">
  25. <property name="driver" value="${jdbc.connection.driverClassName}" />
  26. <property name="driverUrl" value="${jdbc.connection.url}" />
  27. <property name="user" value="${jdbc.connection.username}" />
  28. <property name="password" value="${jdbc.connection.password}" />
  29. <property name="houseKeepingTestSql" value="${proxool.houseKeepingTestSql}" />
  30. <property name="prototypeCount" value="${proxool.prototypeCount}" />
  31. <property name="houseKeepingSleepTime" value="${proxool.hourseKeepingSleepTime}" />
  32. <property name="maximumActiveTime" value="${proxool.maximumActiveTime}" />
  33. <property name="maximumConnectionLifetime" value="${proxool.maximumConnectionLifetime}" />
  34. <property name="minimumConnectionCount" value="${proxool.minimumConnectionCount}" />
  35. <property name="maximumConnectionCount" value="${proxool.maximumConnectionCount}" />
  36. <property name="statistics" value="${proxool.statistics}" />
  37. <property name="alias" value="${proxool.alias}" />
  38. <property name="simultaneousBuildThrottle" value="${proxool.simultaneous-build-throttle}" />
  39. </bean>
  40. <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
  41. <property name="dataSource" ref="dataSource"></property>
  42. </bean>
  43. <tx:advice id="transactionAdvice" transaction-manager="transactionManager">
  44. <tx:attributes>
  45. <tx:method name="add*" propagation="REQUIRED" />
  46. <tx:method name="get*" propagation="REQUIRED" />
  47. <tx:method name="edit*" propagation="REQUIRED" />
  48. <tx:method name="delete*" propagation="REQUIRED" />
  49. <tx:method name="*" no-rollback-for="Throwable" read-only="true" />
  50. </tx:attributes>
  51. </tx:advice>
  52. </beans>

applicationContext.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:batch="http://www.springframework.org/schema/batch"
  3. xmlns:p="http://www.springframework.org/schema/p"
  4. xsi:schemaLocation="
  5. http://www.springframework.org/schema/beans
  6. http://www.springframework.org/schema/beans/spring-beans.xsd
  7. http://www.springframework.org/schema/context
  8. http://www.springframework.org/schema/context/spring-context.xsd
  9. http://www.springframework.org/schema/aop
  10. http://www.springframework.org/schema/aop/spring-aop.xsd
  11. http://www.springframework.org/schema/mvc
  12. http://www.springframework.org/schema/mvc/spring-mvc.xsd
  13. http://www.springframework.org/schema/tx
  14. http://www.springframework.org/schema/tx/spring-tx.xsd
  15. http://www.springframework.org/schema/batch
  16. http://www.springframework.org/schema/batch/spring-batch.xsd">
  17. <import resource="applicationDatabase.xml" />
  18. <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
  19. <property name="transactionManager" ref="transactionManager" />
  20. </bean>
  21. <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
  22. <property name="jobRepository" ref="jobRepository" />
  23. </bean>
  24. <bean id="simpleJob" class="org.springframework.batch.core.job.SimpleJob" abstract="true">
  25. <property name="jobRepository" ref="jobRepository" />
  26. </bean>
  27. </beans>

migrationSimpleJob.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:batch="http://www.springframework.org/schema/batch"
  3. xmlns:p="http://www.springframework.org/schema/p"
  4. xsi:schemaLocation="
  5. http://www.springframework.org/schema/beans
  6. http://www.springframework.org/schema/beans/spring-beans.xsd
  7. http://www.springframework.org/schema/context
  8. http://www.springframework.org/schema/context/spring-context.xsd
  9. http://www.springframework.org/schema/aop
  10. http://www.springframework.org/schema/aop/spring-aop.xsd
  11. http://www.springframework.org/schema/mvc
  12. http://www.springframework.org/schema/mvc/spring-mvc.xsd
  13. http://www.springframework.org/schema/tx
  14. http://www.springframework.org/schema/tx/spring-tx.xsd
  15. http://www.springframework.org/schema/batch
  16. http://www.springframework.org/schema/batch/spring-batch.xsd">
  17. <import resource="applicationContext.xml" />
  18. <bean id="managedObjectMapper" class="com.ManagedObjectMapper" />
  19. <bean id="itemReader" class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step">
  20. <property name="dataSource" ref="dataSource" />
  21. <property name="sql" value="SELECT CO_GID, CO_OC_ID, CO_NAME, CO_DN FROM MANAGE_OBJECTS" />
  22. <property name="rowMapper" ref="managedObjectMapper" />
  23. </bean>
  24. <bean id="itemProcessor" scope="step" class="com.nokia.migration.nasda.processor.ManagedObjectProcessor" />
  25. <bean id="itemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter">
  26. <property name="marshaller" ref="managedObjectMarshaller" />
  27. <property name="resource" value="file:export/cmData.xml" />
  28. <property name="rootTagName" value="cmData" />
  29. </bean>
  30. <bean id="managedObjectMarshaller" class="org.springframework.oxm.jaxb.Jaxb2Marshaller">
  31. <property name="classesToBeBound" value="com.ManagedObject" />
  32. </bean>
  33. <batch:job id="migrationSimpleJob" job-repository="jobRepository" parent="simpleJob">
  34. <batch:step id="migrationStep">
  35. <batch:tasklet transaction-manager="transactionManager">
  36. <batch:chunk reader="itemReader" processor="itemProcessor" writer="itemWriter" commit-interval="2" />
  37. </batch:tasklet>
  38. </batch:step>
  39. </batch:job>
  40. </beans>
  1. Bean类,关联DB Table中的字段

    package com.bean;

    import javax.xml.bind.annotation.XmlAccessOrder;
    import javax.xml.bind.annotation.XmlAccessorOrder;
    import javax.xml.bind.annotation.XmlRootElement;

    @XmlRootElement(name = “managedObject”)
    @XmlAccessorOrder(XmlAccessOrder.UNDEFINED)
    public class ManagedObject {

    1. private long gid;
    2. private long cid;
    3. private String name;
    4. private String dn;
    5. public long getGid() {
    6. return this.gid;
    7. }
    8. public long getCid() {
    9. return this.cid;
    10. }
    11. public String getName() {
    12. return this.name;
    13. }
    14. public String getDn() {
    15. return this.dn;
    16. }
    17. public void setGid(long gid) {
    18. this.gid = gid;
    19. }
    20. public void setCid(long cid) {
    21. this.cid = cid;
    22. }
    23. public void setName(String name) {
    24. this.name = name;
    25. }
    26. public void setDn(String dn) {
    27. this.dn = dn;
    28. }
    29. @Override
    30. public String toString() {
    31. return this.getDn();
    32. }

    }

  1. Mapper类,用于关联DB Table和Bean类的纽带

    package com.mapper;

    import java.sql.ResultSet;
    import java.sql.SQLException;

    import org.springframework.jdbc.core.RowMapper;

    import com.nokia.migration.nasda.bean.ManagedObject;

    public class ManagedObjectMapper implements RowMapper {

    1. @Override
    2. public ManagedObject mapRow(ResultSet resultSet, int rowNum) throws SQLException {
    3. ManagedObject managedObject = new ManagedObject();
    4. managedObject.setGid(resultSet.getLong("CO_GID"));
    5. managedObject.setCid(resultSet.getLong("CO_OC_ID"));
    6. managedObject.setName(resultSet.getString("CO_NAME"));
    7. managedObject.setDn(resultSet.getString("CO_DN"));
    8. return managedObject;
    9. }

    }

  1. 重写ItemReader, ItemProcessor, ItemWriter, 在此只重写了ItemProcess作为示例

    package com.processor;

    import org.springframework.batch.item.ItemProcessor;

    import com.nokia.migration.nasda.bean.ManagedObject;

    /**

    • @author shengshu
      /
      public class ManagedObjectProcessor implements ItemProcessor {

      @Override
      public ManagedObject process(ManagedObject mo) throws Exception {

      1. System.out.println(mo.toString());
      2. return mo;

      }

      }

  1. 程序启动类

    package com.nokia.migration.nasda;

    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.JobExecution;
    import org.springframework.batch.core.JobParameters;
    import org.springframework.batch.core.launch.JobLauncher;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.FileSystemXmlApplicationContext;

    public class MigrationJobLauncher {

    1. public static void main(String[] args) {
    2. @SuppressWarnings("resource")
    3. ApplicationContext context = new FileSystemXmlApplicationContext("conf/migrationSimpleJob.xml");
    4. JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher");
    5. Job job = (Job) context.getBean("migrationSimpleJob");
    6. try {
    7. JobExecution result = launcher.run(job, new JobParameters());
    8. System.out.println(result.toString());
    9. } catch (Exception e) {
    10. e.printStackTrace();
    11. }
    12. }

    }

Eclipse环境相关

  • pom.xml


    4.0.0

    com.nokia.migration
    Migration-OSS5-NetAct8
    1.0
    jar

    Migration-OSS5-NetAct8
    http://maven.apache.org


    UTF-8
    3.0.3.RELEASE
    4.1.4.RELEASE
    1.2.17
    4.11
    1.7
    1.7




    org.springframework
    spring-beans
    ${spring.version}


    org.springframework
    spring-context
    ${spring.version}


    org.springframework
    spring-core
    ${spring.version}


    org.springframework
    spring-aop
    ${spring.version}


    org.mybatis
    mybatis-spring
    1.2.2


    org.mybatis
    mybatis
    3.2.8


    org.springframework.batch
    spring-batch-core
    ${spring.batch.version}


    org.springframework.batch
    spring-batch-infrastructure
    ${spring.batch.version}


    org.springframework
    spring-jdbc
    ${spring.version}


    org.springframework
    spring-oxm
    ${spring.version}



    com.oracle
    ojdbc5
    11.1.0.1



    org.logicalcobwebs
    proxool
    0.9.0RC3



    dom4j
    dom4j
    1.6.1



    org.apache.commons
    commons-lang3
    3.1


    commons-io
    commons-io
    2.4



    junit
    junit
    test
    ${junit.version}






    org.apache.maven.plugins
    maven-shade-plugin


    package

    shade




    Migration-OSS5-NetAct8
    true


    com.nokia.migration.nasda.MigrationJobLauncher






  • create.sql

    DECLARE TABLE_COUNT NUMBER;

    BEGIN
    SELECT COUNT(1) INTO TABLE_COUNT FROM USER_TABLES WHERE TABLE_NAME=’MANAGE_OBJECTS’;

    IF TABLE_COUNT>0 THEN

    1. EXECUTE IMMEDIATE 'DROP TABLE MANAGE_OBJECTS';

    END IF;

    EXECUTE IMMEDIATE ‘CREATE TABLE MANAGE_OBJECTS(
    CO_GID NUMBER NOT NULL,
    CO_OC_ID NUMBER NOT NULL,
    CO_NAME VARCHAR2(80),
    CO_DN VARCHAR2(256),
    CONSTRAINT CO_GID_PK PRIMARY KEY (CO_GID)
    )’;
    END;

    COMMIT;

    —==============================================================================

    DECLARE
    MAXRECORD CONSTANT INT:=1000000;
    IRECORD INT :=1;
    BEGIN
    FOR IRECORD IN 1..MAXRECORD
    LOOP

    1. INSERT
    2. INTO MANAGE_OBJECTS
    3. (
    4. CO_GID,
    5. CO_OC_ID,
    6. CO_NAME,
    7. CO_DN
    8. )
    9. VALUES
    10. (
    11. ROUND(DBMS_RANDOM.VALUE(520355000000000000,520355999999999999)),
    12. ROUND(DBMS_RANDOM.VALUE(16,9999)),
    13. DBMS_RANDOM.STRING('U', 4),
    14. 'PLMN-MIG/'||DBMS_RANDOM.STRING('U', 4)||'-'||ROUND(DBMS_RANDOM.VALUE(1,9999))||'/'||DBMS_RANDOM.STRING('U', 4)||'-'||ROUND(DBMS_RANDOM.VALUE(1,9999))
    15. );

    END LOOP;
    DBMS_OUTPUT.PUT_LINE(‘Insert Successfully’);
    COMMIT;
    END;

  • 目录结构

Center 1

发表评论

表情:
评论列表 (有 0 条评论,334人围观)

还没有评论,来说两句吧...

相关阅读