In this article, we’ll learn how to implement the ClassifierCompositeItemWriter to write the data to multiple destinations.
ClassifierCompositeItemWriter<Customer>
Calls one of a collection of ItemWriters for each item, based on a router pattern implemented through the provided Classifier. The implementation is thread-safe if all delegates are thread-safe.
JobConfiguration.java
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.support.ClassifierCompositeItemWriter;
import org.springframework.batch.item.xml.StaxEventItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.oxm.xstream.XStreamMarshaller;
import com.example.aggregator.CustomLineAggregator;
import com.example.classifier.CustomerClassifier;
import com.example.mapper.CustomerRowMapper;
import com.example.model.Customer;
@Configuration
public class JobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Bean
public JdbcPagingItemReader<Customer> customerPagingItemReader(){
// reading database records using JDBC in a paging fashion
JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
reader.setDataSource(this.dataSource);
reader.setFetchSize(1000);
reader.setRowMapper(new CustomerRowMapper());
// Sort Keys
Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("id", Order.ASCENDING);
// MySQL implementation of a PagingQueryProvider using database specific features.
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("id, firstName, lastName, birthdate");
queryProvider.setFromClause("from customer");
queryProvider.setSortKeys(sortKeys);
reader.setQueryProvider(queryProvider);
return reader;
}
@Bean
public FlatFileItemWriter<Customer> jsonItemWriter() throws Exception{
String customerOutputPath = File.createTempFile("customerOutput", ".out").getAbsolutePath();
System.out.println(">> Output Path = "+customerOutputPath);
FlatFileItemWriter<Customer> writer = new FlatFileItemWriter<>();
writer.setLineAggregator(new CustomLineAggregator());
writer.setResource(new FileSystemResource(customerOutputPath));
writer.afterPropertiesSet();
return writer;
}
@Bean
public StaxEventItemWriter<Customer> xmlItemWriter() throws Exception{
String customerOutputPath = File.createTempFile("customerOutput", ".out").getAbsolutePath();
System.out.println(">> Output Path = "+customerOutputPath);
Map<String, Class> aliases = new HashMap<>();
aliases.put("customer", Customer.class);
XStreamMarshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);
// StAX and Marshaller for serializing object to XML.
StaxEventItemWriter<Customer> writer = new StaxEventItemWriter<>();
writer.setRootTagName("customers");
writer.setMarshaller(marshaller);
writer.setResource(new FileSystemResource(customerOutputPath));
writer.afterPropertiesSet();
return writer;
}
@Bean
public ClassifierCompositeItemWriter<Customer> classifierCustomerCompositeItemWriter() throws Exception{
ClassifierCompositeItemWriter<Customer> compositeItemWriter = new ClassifierCompositeItemWriter<>();
compositeItemWriter.setClassifier(new CustomerClassifier(xmlItemWriter(), jsonItemWriter()));
return compositeItemWriter;
}
@Bean
public Step step1() throws Exception {
return stepBuilderFactory.get("step1")
.<Customer, Customer> chunk(10)
.reader(customerPagingItemReader())
.writer(classifierCustomerCompositeItemWriter())
.stream(xmlItemWriter())
.stream(jsonItemWriter())
.build();
}
@Bean
public Job job() throws Exception {
return jobBuilderFactory.get("job")
.start(step1())
.build();
}
}
CustomerClassifier.java
public class CustomerClassifier implements Classifier<Customer, ItemWriter<? super Customer>>{
private static final long serialVersionUID = 1L;
private ItemWriter<Customer> evenItemWriter;
private ItemWriter<Customer> oddItemWriter;
public CustomerClassifier(ItemWriter<Customer> evenItemWriter, ItemWriter<Customer> oddItemWriter) {
this.evenItemWriter = evenItemWriter;
this.oddItemWriter = oddItemWriter;
}
@Override
public ItemWriter<? super Customer> classify(Customer customer) {
return customer.getId() % 2 == 0 ? evenItemWriter : oddItemWriter;
}}
Customer.java
@Data
@AllArgsConstructor
@Builder
@NoArgsConstructor
public class Customer {
private Long id;
private String firstName;
private String lastName;
private String birthdate;}
CustomerRowMapper.java
public class CustomerRowMapper implements RowMapper<Customer> {
@Override
public Customer mapRow(ResultSet rs, int rowNum) throws SQLException {
return Customer.builder().id(rs.getLong("id"))
.firstName(rs.getString("firstName"))
.lastName(rs.getString("lastName"))
.birthdate(rs.getString("birthdate")).build();
}}
CustomLineAggregator.java
public class CustomLineAggregator implements LineAggregator<Customer> {
private ObjectMapper objectMapper = new ObjectMapper();
@Override
public String aggregate(Customer item) {
try {
return objectMapper.writeValueAsString(item);
} catch (Exception e) {
throw new RuntimeException("Unable to serialize Customer", e);
}
}}
ClassifierWrittingMultipleDestinationsApplication.java
@SpringBootApplication
@EnableBatchProcessing
public class ClassifierWrittingMultipleDestinationsApplication implements CommandLineRunner {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job job;
public static void main(String[] args) {
SpringApplication.run(ClassifierWrittingMultipleDestinationsApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addString("JobId", String.valueOf(System.currentTimeMillis()))
.addDate("date", new Date())
.addLong("time",System.currentTimeMillis()).toJobParameters();
JobExecution execution = jobLauncher.run(job, jobParameters);
System.out.println("STATUS :: "+execution.getStatus());
}}
application.properties
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/test
spring.datasource.username=root
spring.datasource.password=rootspring.batch.initialize-schema=always
schema.sql
CREATE TABLE `test`.`customer` (
`id` MEDIUMINT(8) UNSIGNED NOT NULL AUTO_INCREMENT,
`firstName` VARCHAR(255) NULL,
`lastName` VARCHAR(255) NULL,
`birthdate` VARCHAR(255) NULL,
PRIMARY KEY (`id`)) AUTO_INCREMENT=1;
data.sql
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('1', 'John', 'Doe', '10-10-1952 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('2', 'Amy', 'Eugene', '05-07-1985 17:10:00');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('3', 'Laverne', 'Mann', '11-12-1988 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('4', 'Janice', 'Preston', '19-02-1960 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('5', 'Pauline', 'Rios', '29-08-1977 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('6', 'Perry', 'Burnside', '10-03-1981 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('7', 'Todd', 'Kinsey', '14-12-1998 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('8', 'Jacqueline', 'Hyde', '20-03-1983 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('9', 'Rico', 'Hale', '10-10-2000 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('10', 'Samuel', 'Lamm', '11-11-1999 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('11', 'Robert', 'Coster', '10-10-1972 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('12', 'Tamara', 'Soler', '02-01-1978 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('13', 'Justin', 'Kramer', '19-11-1951 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('14', 'Andrea', 'Law', '14-10-1959 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('15', 'Laura', 'Porter', '12-12-2010 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('16', 'Michael', 'Cantu', '11-04-1999 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('17', 'Andrew', 'Thomas', '04-05-1967 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('18', 'Jose', 'Hannah', '16-09-1950 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('19', 'Valerie', 'Hilbert', '13-06-1966 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('20', 'Patrick', 'Durham', '12-10-1978 10:10:10');
Output: Two response files have been created, and it means we’re able to write data to multiple destinations.
>> Output Path = C:\Users\user\AppData\Local\Temp\1\customerOutput5824034651210161854.out>> Output Path = C:\Users\user\AppData\Local\Temp\1\customerOutput8870745056369564522.out
JSON Data
{"id":1,"firstName":"John","lastName":"Doe","birthdate":"10-10-1952 10:10:10"}
{"id":2,"firstName":"Amy","lastName":"Eugene","birthdate":"05-07-1985 17:10:00"}
{"id":3,"firstName":"Laverne","lastName":"Mann","birthdate":"11-12-1988 10:10:10"}
{"id":4,"firstName":"Janice","lastName":"Preston","birthdate":"19-02-1960 10:10:10"}
{"id":5,"firstName":"Pauline","lastName":"Rios","birthdate":"29-08-1977 10:10:10"}
{"id":6,"firstName":"Perry","lastName":"Burnside","birthdate":"10-03-1981 10:10:10"}
{"id":7,"firstName":"Todd","lastName":"Kinsey","birthdate":"14-12-1998 10:10:10"}
{"id":8,"firstName":"Jacqueline","lastName":"Hyde","birthdate":"20-03-1983 10:10:10"}
{"id":9,"firstName":"Rico","lastName":"Hale","birthdate":"10-10-2000 10:10:10"}
{"id":10,"firstName":"Samuel","lastName":"Lamm","birthdate":"11-11-1999 10:10:10"}
{"id":11,"firstName":"Robert","lastName":"Coster","birthdate":"10-10-1972 10:10:10"}
{"id":12,"firstName":"Tamara","lastName":"Soler","birthdate":"02-01-1978 10:10:10"}
{"id":13,"firstName":"Justin","lastName":"Kramer","birthdate":"19-11-1951 10:10:10"}
{"id":14,"firstName":"Andrea","lastName":"Law","birthdate":"14-10-1959 10:10:10"}
{"id":15,"firstName":"Laura","lastName":"Porter","birthdate":"12-12-2010 10:10:10"}
{"id":16,"firstName":"Michael","lastName":"Cantu","birthdate":"11-04-1999 10:10:10"}
{"id":17,"firstName":"Andrew","lastName":"Thomas","birthdate":"04-05-1967 10:10:10"}
{"id":18,"firstName":"Jose","lastName":"Hannah","birthdate":"16-09-1950 10:10:10"}
{"id":19,"firstName":"Valerie","lastName":"Hilbert","birthdate":"13-06-1966 10:10:10"}{"id":20,"firstName":"Patrick","lastName":"Durham","birthdate":"12-10-1978 10:10:10"}
customerOutput5824034651210161854.out
<?xml version="1.0" encoding="UTF-8"?>
2
<customers>
3
<customer>
4
<id>1</id>
5
<firstName>John</firstName>
6
<lastName>Doe</lastName>
7
<birthdate>10-10-1952 10:10:10</birthdate>
8
</customer>
9
<customer>
10
<id>2</id>
11
<firstName>Amy</firstName>
12
<lastName>Eugene</lastName>
13
<birthdate>05-07-1985 17:10:00</birthdate>
14
</customer>
15
<customer>
16
<id>3</id>
17
<firstName>Laverne</firstName>
18
<lastName>Mann</lastName>
19
<birthdate>11-12-1988 10:10:10</birthdate>
20
</customer>
21
<customer>
22
<id>4</id>
23
<firstName>Janice</firstName>
24
<lastName>Preston</lastName>
25
<birthdate>19-02-1960 10:10:10</birthdate>
26
</customer>
27
<customer>
28
<id>5</id>
29
<firstName>Pauline</firstName>
30
<lastName>Rios</lastName>
31
<birthdate>29-08-1977 10:10:10</birthdate>
32
</customer>
33
<customer>
34
<id>6</id>
35
<firstName>Perry</firstName>
36
<lastName>Burnside</lastName>
37
<birthdate>10-03-1981 10:10:10</birthdate>
38
</customer>
39
<customer>
40
<id>7</id>
41
<firstName>Todd</firstName>
42
<lastName>Kinsey</lastName>
43
<birthdate>14-12-1998 10:10:10</birthdate>
44
</customer>
45
<customer>
46
<id>8</id>
47
<firstName>Jacqueline</firstName>
48
<lastName>Hyde</lastName>
49
<birthdate>20-03-1983 10:10:10</birthdate>
50
</customer>
51
<customer>
52
<id>9</id>
53
<firstName>Rico</firstName>
54
<lastName>Hale</lastName>
55
<birthdate>10-10-2000 10:10:10</birthdate>
56
</customer>
57
<customer>
58
<id>10</id>
59
<firstName>Samuel</firstName>
60
<lastName>Lamm</lastName>
61
<birthdate>11-11-1999 10:10:10</birthdate>
62
</customer>
63
<customer>
64
<id>11</id>
65
<firstName>Robert</firstName>
66
<lastName>Coster</lastName>
67
<birthdate>10-10-1972 10:10:10</birthdate>
68
</customer>
69
<customer>
70
<id>12</id>
71
<firstName>Tamara</firstName>
72
<lastName>Soler</lastName>
73
<birthdate>02-01-1978 10:10:10</birthdate>
74
</customer>
75
<customer>
76
<id>13</id>
77
<firstName>Justin</firstName>
78
<lastName>Kramer</lastName>
79
<birthdate>19-11-1951 10:10:10</birthdate>
80
</customer>
81
<customer>
82
<id>14</id>
83
<firstName>Andrea</firstName>
84
<lastName>Law</lastName>
85
<birthdate>14-10-1959 10:10:10</birthdate>
86
</customer>
87
<customer>
88
<id>15</id>
89
<firstName>Laura</firstName>
90
<lastName>Porter</lastName>
91
<birthdate>12-12-2010 10:10:10</birthdate>
92
</customer>
93
<customer>
94
<id>16</id>
95
<firstName>Michael</firstName>
96
<lastName>Cantu</lastName>
97
<birthdate>11-04-1999 10:10:10</birthdate>
98
</customer>
99
<customer>
100
<id>17</id>
101
<firstName>Andrew</firstName>
102
<lastName>Thomas</lastName>
103
<birthdate>04-05-1967 10:10:10</birthdate>
104
</customer>
105
<customer>
106
<id>18</id>
107
<firstName>Jose</firstName>
108
<lastName>Hannah</lastName>
109
<birthdate>16-09-1950 10:10:10</birthdate>
110
</customer>
111
<customer>
112
<id>19</id>
113
<firstName>Valerie</firstName>
114
<lastName>Hilbert</lastName>
115
<birthdate>13-06-1966 10:10:10</birthdate>
116
</customer>
117
<customer>
118
<id>20</id>
119
<firstName>Patrick</firstName>
120
<lastName>Durham</lastName>
121
<birthdate>12-10-1978 10:10:10</birthdate>
122
</customer>
123</customers>