SB— AsyncItemProcessor and Writer

4 min readJun 12, 2021


In this tutorial, we’ll see how to make the use of AyncProcessor and AsyncWriter. Asynchronous Processors help you to scale the processing of items. In the asynchronous processor use case, an AsyncItemProcessor serves as a dispatcher, executing the logic of the ItemProcessor for an item on a new thread. Once the item completes, the Future is passed to the AsynchItemWriter to be written.

Therefore, you can increase performance by using asynchronous item processing, basically letting you implement fork-join scenarios. The AsyncItemWriter gathers the results and writes back the chunk as soon as all the results become available.

The delegate property refers to your ItemProcessor bean, and the taskExecutor property refers to the TaskExecutor of your choice.

schema-mysql.sql — Holds Customer domain information — An interface used by JdbcTemplate for mapping rows of a ResultSet on a per-row basis. Implementations of this interface perform the actual work of mapping each row to a result object, but don’t need to worry about exception handling. SQLExceptions will be caught and handled by the calling JdbcTemplate.

JobConfiguration — Holds all the beans necessary for running the batch job.

JdbcPagingItemReader — Helps to read the result/data from database using Pages.

public class JobConfiguration {
private JobBuilderFactory jobBuilderFactory;

private StepBuilderFactory stepBuilderFactory;

private DataSource dataSource;

public JdbcPagingItemReader<Customer> customerPagingItemReader(){
// reading database records using JDBC in a paging fashion
JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
reader.setRowMapper(new CustomerRowMapper());

// Sort Keys
Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("id", Order.ASCENDING);

// MySQL implementation of a PagingQueryProvider using database specific features.
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("id, firstName, lastName, birthdate");
queryProvider.setFromClause("from customer");


return reader;

public ItemProcessor itemProcessor(){
return new ItemProcessor<Customer, Customer>() {

public Customer process(Customer item) throws Exception {
Thread.sleep(new Random().nextInt(10));
return Customer.builder().id(item.getId()).firstName(item.getFirstName())

public AsyncItemProcessor asyncItemProcessor() throws Exception{
AsyncItemProcessor<Customer, Customer> asyncItemProcessor = new AsyncItemProcessor<>();
asyncItemProcessor.setTaskExecutor(new SimpleAsyncTaskExecutor());
return asyncItemProcessor;

public JdbcBatchItemWriter<Customer> customerItemWriter(){
JdbcBatchItemWriter<Customer> writer = new JdbcBatchItemWriter<>();
writer.setSql("INSERT INTO NEW_CUSTOMER VALUES (:id, :firstName, :lastName, :birthdate)");
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());

return writer;

public AsyncItemWriter<Customer> asyncItemWriter() throws Exception{
AsyncItemWriter<Customer> asyncItemWriter = new AsyncItemWriter<>();
return asyncItemWriter;

public Step step1() throws Exception {
return stepBuilderFactory.get("step1")

public Job job() throws Exception {
return jobBuilderFactory.get("job")





INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('1', 'John', 'Doe', '10-10-1952 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('2', 'Amy', 'Eugene', '05-07-1985 17:10:00');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('3', 'Laverne', 'Mann', '11-12-1988 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('4', 'Janice', 'Preston', '19-02-1960 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('5', 'Pauline', 'Rios', '29-08-1977 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('6', 'Perry', 'Burnside', '10-03-1981 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('7', 'Todd', 'Kinsey', '14-12-1998 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('8', 'Jacqueline', 'Hyde', '20-03-1983 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('9', 'Rico', 'Hale', '10-10-2000 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('10', 'Samuel', 'Lamm', '11-11-1999 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('11', 'Robert', 'Coster', '10-10-1972 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('12', 'Tamara', 'Soler', '02-01-1978 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('13', 'Justin', 'Kramer', '19-11-1951 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('14', 'Andrea', 'Law', '14-10-1959 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('15', 'Laura', 'Porter', '12-12-2010 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('16', 'Michael', 'Cantu', '11-04-1999 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('17', 'Andrew', 'Thomas', '04-05-1967 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('18', 'Jose', 'Hannah', '16-09-1950 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('19', 'Valerie', 'Hilbert', '13-06-1966 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('20', 'Patrick', 'Durham', '12-10-1978 10:10:10');

import java.util.Date;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

public class AsyncItemProcessorItemWriterApplication implements CommandLineRunner{
private JobLauncher jobLauncher;

private Job job;

public static void main(String[] args) {, args);

public void run(String... args) throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addString("JobId", String.valueOf(System.currentTimeMillis()))
.addDate("date", new Date())

JobExecution execution =, jobParameters);
System.out.println("STATUS :: "+execution.getStatus());

Now, simply run the Spring Batch App, it will load all customers data from customer table into the new_customer table.




Written by Prateek

Java Developer and enthusiast

No responses yet