SB— AsyncItemProcessor and Writer

Prateek
4 min readJun 12, 2021

--

In this tutorial, we’ll see how to make the use of AyncProcessor and AsyncWriter. Asynchronous Processors help you to scale the processing of items. In the asynchronous processor use case, an AsyncItemProcessor serves as a dispatcher, executing the logic of the ItemProcessor for an item on a new thread. Once the item completes, the Future is passed to the AsynchItemWriter to be written.

Therefore, you can increase performance by using asynchronous item processing, basically letting you implement fork-join scenarios. The AsyncItemWriter gathers the results and writes back the chunk as soon as all the results become available.

The delegate property refers to your ItemProcessor bean, and the taskExecutor property refers to the TaskExecutor of your choice.

schema-mysql.sql

Customer.java — Holds Customer domain information

CustomerRowMapper.java — An interface used by JdbcTemplate for mapping rows of a ResultSet on a per-row basis. Implementations of this interface perform the actual work of mapping each row to a result object, but don’t need to worry about exception handling. SQLExceptions will be caught and handled by the calling JdbcTemplate.

JobConfiguration — Holds all the beans necessary for running the batch job.

JdbcPagingItemReader — Helps to read the result/data from database using Pages.

@Configuration
public class JobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Autowired
private DataSource dataSource;

@Bean
public JdbcPagingItemReader<Customer> customerPagingItemReader(){
// reading database records using JDBC in a paging fashion
JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
reader.setDataSource(this.dataSource);
reader.setFetchSize(1000);
reader.setRowMapper(new CustomerRowMapper());

// Sort Keys
Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("id", Order.ASCENDING);

// MySQL implementation of a PagingQueryProvider using database specific features.
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("id, firstName, lastName, birthdate");
queryProvider.setFromClause("from customer");
queryProvider.setSortKeys(sortKeys);

reader.setQueryProvider(queryProvider);

return reader;
}

@Bean
public ItemProcessor itemProcessor(){
return new ItemProcessor<Customer, Customer>() {

@Override
public Customer process(Customer item) throws Exception {
Thread.sleep(new Random().nextInt(10));
return Customer.builder().id(item.getId()).firstName(item.getFirstName())
.lastName(item.getLastName()).birthdate(item.getBirthdate()).build();
}
};
}

@Bean
public AsyncItemProcessor asyncItemProcessor() throws Exception{
AsyncItemProcessor<Customer, Customer> asyncItemProcessor = new AsyncItemProcessor<>();
asyncItemProcessor.setDelegate(itemProcessor());
asyncItemProcessor.setTaskExecutor(new SimpleAsyncTaskExecutor());
asyncItemProcessor.afterPropertiesSet();
return asyncItemProcessor;
}

@Bean
public JdbcBatchItemWriter<Customer> customerItemWriter(){
JdbcBatchItemWriter<Customer> writer = new JdbcBatchItemWriter<>();
writer.setDataSource(dataSource);
writer.setSql("INSERT INTO NEW_CUSTOMER VALUES (:id, :firstName, :lastName, :birthdate)");
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
writer.afterPropertiesSet();

return writer;
}

@Bean
public AsyncItemWriter<Customer> asyncItemWriter() throws Exception{
AsyncItemWriter<Customer> asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(customerItemWriter());
asyncItemWriter.afterPropertiesSet();
return asyncItemWriter;
}

@SuppressWarnings("unchecked")
@Bean
public Step step1() throws Exception {
return stepBuilderFactory.get("step1")
.chunk(1000)
.reader(customerPagingItemReader())
.processor(asyncItemProcessor())
.writer(asyncItemWriter())
.build();
}

@Bean
public Job job() throws Exception {
return jobBuilderFactory.get("job")
.start(step1())
.build();
}
}

application.properties

spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/test?useSSL=false
spring.datasource.username=root
spring.datasource.password=root

spring.batch.initialize-schema=always

spring.batch.job.enabled=false

data-mysql.sql

INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('1', 'John', 'Doe', '10-10-1952 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('2', 'Amy', 'Eugene', '05-07-1985 17:10:00');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('3', 'Laverne', 'Mann', '11-12-1988 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('4', 'Janice', 'Preston', '19-02-1960 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('5', 'Pauline', 'Rios', '29-08-1977 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('6', 'Perry', 'Burnside', '10-03-1981 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('7', 'Todd', 'Kinsey', '14-12-1998 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('8', 'Jacqueline', 'Hyde', '20-03-1983 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('9', 'Rico', 'Hale', '10-10-2000 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('10', 'Samuel', 'Lamm', '11-11-1999 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('11', 'Robert', 'Coster', '10-10-1972 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('12', 'Tamara', 'Soler', '02-01-1978 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('13', 'Justin', 'Kramer', '19-11-1951 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('14', 'Andrea', 'Law', '14-10-1959 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('15', 'Laura', 'Porter', '12-12-2010 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('16', 'Michael', 'Cantu', '11-04-1999 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('17', 'Andrew', 'Thomas', '04-05-1967 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('18', 'Jose', 'Hannah', '16-09-1950 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('19', 'Valerie', 'Hilbert', '13-06-1966 10:10:10');
INSERT INTO `test`.`customer` (`id`, `firstName`, `lastName`, `birthdate`) VALUES ('20', 'Patrick', 'Durham', '12-10-1978 10:10:10');

AsyncItemProcessorItemWriterApplication.java

import java.util.Date;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableBatchProcessing
public class AsyncItemProcessorItemWriterApplication implements CommandLineRunner{
@Autowired
private JobLauncher jobLauncher;

@Autowired
private Job job;

public static void main(String[] args) {
SpringApplication.run(AsyncItemProcessorItemWriterApplication.class, args);
}

@Override
public void run(String... args) throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addString("JobId", String.valueOf(System.currentTimeMillis()))
.addDate("date", new Date())
.addLong("time",System.currentTimeMillis()).toJobParameters();

JobExecution execution = jobLauncher.run(job, jobParameters);
System.out.println("STATUS :: "+execution.getStatus());
}
}

Now, simply run the Spring Batch App, it will load all customers data from customer table into the new_customer table.

--

--

Prateek
Prateek

Written by Prateek

Java Developer and enthusiast

No responses yet