Spring Batch Read from DB and Write to File
Spring Batch是SpringSource和Accenture联合开发的,主要解决轻量级的大数据批处理。下在附上一个架构图,方便理解。
显然,在我们的开发过程中,我们主要观注ItemReader, ItemProcessor, ItemWriter, 数据的读写包括文件与数据库, 对于中间的环节ItemProcessor,也是至关重要的,也就是说:读来的数据的处理逻辑就在此,处理了之后再进入写的环节。当然我们可以重写ItemReader, ItemProcessor, ItemWriter.
好了,下面进入Spring Batch的示例环节。
- Spring的相关配置文件:DB相关的属性文件database.properties,DB相关的配置文件applicationDatabase.xml(包括ProxoolDataSource, DataSourceTransactionManager), Spring Batch的基本配置文件applicationContext.xml, 业务相关的配置文件migrationSimpleJob.xml
database.properties
jdbc.connection.driverClassName=oracle.jdbc.driver.OracleDriver
jdbc.connection.url=jdbc:oracle:thin:<username>/<password>@100.111.86.250:1521:<SID>
jdbc.connection.username=<username>
jdbc.connection.password=<password>
proxool.houseKeepingTestSql=SELECT CURRENT_DATE
proxool.prototypeCount=0
proxool.hourseKeepingSleepTime=30000
proxool.maximumActiveTime=300000
proxool.maximumConnectionLifetime=14400000
proxool.minimumConnectionCount=5
proxool.maximumConnectionCount=15
proxool.statistics=15s,10m,1d
proxool.alias=pool_dbname
proxool.simultaneous-build-throttle=50
applicationDatabase.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx.xsd
http://www.springframework.org/schema/batch
http://www.springframework.org/schema/batch/spring-batch.xsd">
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<list>
<value>conf/database.properties</value>
</list>
</property>
</bean>
<bean id="dataSource" class="org.logicalcobwebs.proxool.ProxoolDataSource">
<property name="driver" value="${jdbc.connection.driverClassName}" />
<property name="driverUrl" value="${jdbc.connection.url}" />
<property name="user" value="${jdbc.connection.username}" />
<property name="password" value="${jdbc.connection.password}" />
<property name="houseKeepingTestSql" value="${proxool.houseKeepingTestSql}" />
<property name="prototypeCount" value="${proxool.prototypeCount}" />
<property name="houseKeepingSleepTime" value="${proxool.hourseKeepingSleepTime}" />
<property name="maximumActiveTime" value="${proxool.maximumActiveTime}" />
<property name="maximumConnectionLifetime" value="${proxool.maximumConnectionLifetime}" />
<property name="minimumConnectionCount" value="${proxool.minimumConnectionCount}" />
<property name="maximumConnectionCount" value="${proxool.maximumConnectionCount}" />
<property name="statistics" value="${proxool.statistics}" />
<property name="alias" value="${proxool.alias}" />
<property name="simultaneousBuildThrottle" value="${proxool.simultaneous-build-throttle}" />
</bean>
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"></property>
</bean>
<tx:advice id="transactionAdvice" transaction-manager="transactionManager">
<tx:attributes>
<tx:method name="add*" propagation="REQUIRED" />
<tx:method name="get*" propagation="REQUIRED" />
<tx:method name="edit*" propagation="REQUIRED" />
<tx:method name="delete*" propagation="REQUIRED" />
<tx:method name="*" no-rollback-for="Throwable" read-only="true" />
</tx:attributes>
</tx:advice>
</beans>
applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx.xsd
http://www.springframework.org/schema/batch
http://www.springframework.org/schema/batch/spring-batch.xsd">
<import resource="applicationDatabase.xml" />
<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
<property name="transactionManager" ref="transactionManager" />
</bean>
<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
</bean>
<bean id="simpleJob" class="org.springframework.batch.core.job.SimpleJob" abstract="true">
<property name="jobRepository" ref="jobRepository" />
</bean>
</beans>
migrationSimpleJob.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx.xsd
http://www.springframework.org/schema/batch
http://www.springframework.org/schema/batch/spring-batch.xsd">
<import resource="applicationContext.xml" />
<bean id="managedObjectMapper" class="com.ManagedObjectMapper" />
<bean id="itemReader" class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step">
<property name="dataSource" ref="dataSource" />
<property name="sql" value="SELECT CO_GID, CO_OC_ID, CO_NAME, CO_DN FROM MANAGE_OBJECTS" />
<property name="rowMapper" ref="managedObjectMapper" />
</bean>
<bean id="itemProcessor" scope="step" class="com.nokia.migration.nasda.processor.ManagedObjectProcessor" />
<bean id="itemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter">
<property name="marshaller" ref="managedObjectMarshaller" />
<property name="resource" value="file:export/cmData.xml" />
<property name="rootTagName" value="cmData" />
</bean>
<bean id="managedObjectMarshaller" class="org.springframework.oxm.jaxb.Jaxb2Marshaller">
<property name="classesToBeBound" value="com.ManagedObject" />
</bean>
<batch:job id="migrationSimpleJob" job-repository="jobRepository" parent="simpleJob">
<batch:step id="migrationStep">
<batch:tasklet transaction-manager="transactionManager">
<batch:chunk reader="itemReader" processor="itemProcessor" writer="itemWriter" commit-interval="2" />
</batch:tasklet>
</batch:step>
</batch:job>
</beans>
Bean类,关联DB Table中的字段
package com.bean;
import javax.xml.bind.annotation.XmlAccessOrder;
import javax.xml.bind.annotation.XmlAccessorOrder;
import javax.xml.bind.annotation.XmlRootElement;@XmlRootElement(name = “managedObject”)
@XmlAccessorOrder(XmlAccessOrder.UNDEFINED)
public class ManagedObject {private long gid;
private long cid;
private String name;
private String dn;
public long getGid() {
return this.gid;
}
public long getCid() {
return this.cid;
}
public String getName() {
return this.name;
}
public String getDn() {
return this.dn;
}
public void setGid(long gid) {
this.gid = gid;
}
public void setCid(long cid) {
this.cid = cid;
}
public void setName(String name) {
this.name = name;
}
public void setDn(String dn) {
this.dn = dn;
}
@Override
public String toString() {
return this.getDn();
}
}
Mapper类,用于关联DB Table和Bean类的纽带
package com.mapper;
import java.sql.ResultSet;
import java.sql.SQLException;import org.springframework.jdbc.core.RowMapper;
import com.nokia.migration.nasda.bean.ManagedObject;
public class ManagedObjectMapper implements RowMapper
{ @Override
public ManagedObject mapRow(ResultSet resultSet, int rowNum) throws SQLException {
ManagedObject managedObject = new ManagedObject();
managedObject.setGid(resultSet.getLong("CO_GID"));
managedObject.setCid(resultSet.getLong("CO_OC_ID"));
managedObject.setName(resultSet.getString("CO_NAME"));
managedObject.setDn(resultSet.getString("CO_DN"));
return managedObject;
}
}
重写ItemReader, ItemProcessor, ItemWriter, 在此只重写了ItemProcess作为示例
package com.processor;
import org.springframework.batch.item.ItemProcessor;
import com.nokia.migration.nasda.bean.ManagedObject;
/**
@author shengshu
/
public class ManagedObjectProcessor implements ItemProcessor{ @Override
public ManagedObject process(ManagedObject mo) throws Exception {System.out.println(mo.toString());
return mo;
}
}
程序启动类
package com.nokia.migration.nasda;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;public class MigrationJobLauncher {
public static void main(String[] args) {
@SuppressWarnings("resource")
ApplicationContext context = new FileSystemXmlApplicationContext("conf/migrationSimpleJob.xml");
JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher");
Job job = (Job) context.getBean("migrationSimpleJob");
try {
JobExecution result = launcher.run(job, new JobParameters());
System.out.println(result.toString());
} catch (Exception e) {
e.printStackTrace();
}
}
}
Eclipse环境相关
pom.xml
4.0.0
com.nokia.migration
Migration-OSS5-NetAct8
1.0
jar
Migration-OSS5-NetAct8
http://maven.apache.org
UTF-8
3.0.3.RELEASE
4.1.4.RELEASE
1.2.17
4.11
1.7
1.7
org.springframework
spring-beans
${spring.version}
org.springframework
spring-context
${spring.version}
org.springframework
spring-core
${spring.version}
org.springframework
spring-aop
${spring.version}
org.mybatis
mybatis-spring
1.2.2
org.mybatis
mybatis
3.2.8
org.springframework.batch
spring-batch-core
${spring.batch.version}
org.springframework.batch
spring-batch-infrastructure
${spring.batch.version}
org.springframework
spring-jdbc
${spring.version}
org.springframework
spring-oxm
${spring.version}
com.oracle
ojdbc5
11.1.0.1
org.logicalcobwebs
proxool
0.9.0RC3
dom4j
dom4j
1.6.1
org.apache.commons
commons-lang3
3.1
commons-io
commons-io
2.4
junit
junit
test
${junit.version}
org.apache.maven.plugins
maven-shade-plugin
package
shade
Migration-OSS5-NetAct8
true
com.nokia.migration.nasda.MigrationJobLauncher
create.sql
DECLARE TABLE_COUNT NUMBER;
BEGIN
SELECT COUNT(1) INTO TABLE_COUNT FROM USER_TABLES WHERE TABLE_NAME=’MANAGE_OBJECTS’;IF TABLE_COUNT>0 THEN
EXECUTE IMMEDIATE 'DROP TABLE MANAGE_OBJECTS';
END IF;
EXECUTE IMMEDIATE ‘CREATE TABLE MANAGE_OBJECTS(
CO_GID NUMBER NOT NULL,
CO_OC_ID NUMBER NOT NULL,
CO_NAME VARCHAR2(80),
CO_DN VARCHAR2(256),
CONSTRAINT CO_GID_PK PRIMARY KEY (CO_GID)
)’;
END;COMMIT;
—==============================================================================
DECLARE
MAXRECORD CONSTANT INT:=1000000;
IRECORD INT :=1;
BEGIN
FOR IRECORD IN 1..MAXRECORD
LOOPINSERT
INTO MANAGE_OBJECTS
(
CO_GID,
CO_OC_ID,
CO_NAME,
CO_DN
)
VALUES
(
ROUND(DBMS_RANDOM.VALUE(520355000000000000,520355999999999999)),
ROUND(DBMS_RANDOM.VALUE(16,9999)),
DBMS_RANDOM.STRING('U', 4),
'PLMN-MIG/'||DBMS_RANDOM.STRING('U', 4)||'-'||ROUND(DBMS_RANDOM.VALUE(1,9999))||'/'||DBMS_RANDOM.STRING('U', 4)||'-'||ROUND(DBMS_RANDOM.VALUE(1,9999))
);
END LOOP;
DBMS_OUTPUT.PUT_LINE(‘Insert Successfully’);
COMMIT;
END;
- 目录结构
还没有评论,来说两句吧...