In this example — we’ll use ItemStreamReader to read the data from multiple flatfiles of different types and combine the results in one model and write into csv.
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.1</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>xml-spring-batch-csv-to-xml</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-batch-composite-item-writer</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
</dependency>
<dependency>
<groupId>com.github.javafaker</groupId>
<artifactId>javafaker</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>com.sun.xml.bind</groupId>
<artifactId>jaxb-core</artifactId>
<version>2.3.0.1</version>
</dependency>
<dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>com.sun.xml.bind</groupId>
<artifactId>jaxb-impl</artifactId>
<version>2.3.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
CompositeItemStreamReader — Convenience interface that combines ItemStream
and ItemReader
.
package com.example;
import java.util.ArrayList;
import java.util.List;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamReader;
public class CompositeItemStreamReader<T> implements ItemStreamReader<T> {
/** Registered ItemStreamReaders. */
private List<ItemStreamReader<?>> itemReaderStreams;
/** Mandatory Unifying Mapper Implementation. */
private UnifyingItemsMapper<T> unifyingMapper;
@Override
public T read() throws Exception {
// read from all registered readers
List items = new ArrayList();
for (ItemStreamReader<?> itemReaderStream : itemReaderStreams) {
items.add(itemReaderStream.read());
}
// delegate to mapper
return unifyingMapper.mapItems(items);
}
/**
* Broadcast the call to update to all registered readers.
*/
@Override
public void update(ExecutionContext executionContext) {
for (ItemStream itemStream : itemReaderStreams) {
itemStream.update(executionContext);
}
}
/**
* Broadcast the call to close to all registered readers.
*/
@Override
public void close() throws ItemStreamException {
for (ItemStream itemStream : itemReaderStreams) {
itemStream.close();
}
}
/**
* Broadcast the call to open to all registered readers.
*/
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
for (ItemStream itemStream : itemReaderStreams) {
itemStream.open(executionContext);
}
}
public void setUnifyingMapper(UnifyingItemsMapper<T> mapper) {
this.unifyingMapper = mapper;
}
/**
* Register ItemStreamReaders.
*/
public void setItemReaderStreams(List<ItemStreamReader<?>> itemReaderStreams) {
this.itemReaderStreams = itemReaderStreams;
}
}
ItemMapper.java
package com.example;
import java.util.List;
public interface UnifyingItemsMapper<T> {
T mapItems(List<?> items) throws Exception;
}
Impl
import java.util.List;
import org.springframework.util.CollectionUtils;
public class DefaultUnifyingStringItemsMapper implements UnifyingItemsMapper<String> {
@Override
public String mapItems(List<?> items) throws Exception {
System.out.println(items);
if (!CollectionUtils.isEmpty(items)) {
StringBuilder sb = new StringBuilder();
for (Object item : items) {
if (item != null) {
sb.append(item);
}
}
if (sb.length() > 0) {
return sb.toString();
} else {
return null;
}
} else {
return null;
}
}
}
spring-context.xml
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- JobRepository and JobLauncher are configuration/setup classes -->
<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean" />
<bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/>
<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
</bean>
<job id="compositeFlatFileReaderJob" xmlns="http://www.springframework.org/schema/batch">
<step id="compositeFlatFileReaderStep">
<tasklet>
<chunk
reader="compositeItemReader"
writer="itemWriter"
commit-interval="5">
</chunk>
<listeners>
<listener ref="stepListener" />
</listeners>
</tasklet>
</step>
</job>
<bean id="compositeItemReader" class="com.example.CompositeItemStreamReader" scope="step" >
<property name="unifyingMapper">
<bean class="com.example.DefaultUnifyingStringItemsMapper" />
</property>
<property name="itemReaderStreams">
<list>
<ref bean="itemReader1" />
<ref bean="itemReader2" />
</list>
</property>
</bean>
<bean id="stepListener" class="com.example.MyStepListener" />
<bean id="itemReader1" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
<property name="name" value="itemReader1" />
<property name="resource" value="classpath:employee.csv" />
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.PassThroughLineMapper" />
</property>
<property name="strict" value="true" />
</bean>
<bean id="itemReader2" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
<property name="name" value="itemReader2" />
<property name="resource" value="classpath:department.csv" />
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.PassThroughLineMapper" />
</property>
<property name="strict" value="true" />
</bean>
<bean id="itemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter" scope="step">
<property name="resource" value="file:csv/output.csv" />
<property name="shouldDeleteIfExists" value="true" />
<property name="lineAggregator">
<bean class="org.springframework.batch.item.file.transform.PassThroughLineAggregator"/>
</property>
</bean>
</beans>
employee.csv
empId,firstName
1,John
department.csv
deptId, deptName, empId
1, Engineering,1