Spring Batch — Multithreaded Steps

In this tutorial, we’ll learn how to implement the Multithreaded Steps in the Spring Batch App. To improve the performance of the app or read large volume of data — we often choose “parallel programming” or “partition” or “multithreaded steps” or “Async Programming”.

Lets begin —

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>multithreadedStep</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>multithreadedStep</name>
<description>Demo project for Spring Boot</description>

<properties>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>

<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

Customer.java — Holds the Customer details.

@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
public class Customer {
private Long id;
private String firstName;
private String lastName;
private String birthdate;
}

CustomerRowMapper.java — Interface that is used to map data obtained from a FieldSet into an object.

public class CustomerRowMapper implements RowMapper<Customer> {

@Override
public Customer mapRow(ResultSet rs, int rowNum) throws SQLException {
//@// @formatter:off
return Customer.builder()
.id(rs.getLong("id"))
.firstName(rs.getString("firstName"))
.lastName(rs.getString("lastName"))
.birthdate(rs.getString("birthdate"))
.build();
// @formatter:on
}
}

JobConfigurations.java — Note keep the chunk size and fetch size a same value to be able to process it in the same transaction.

SimpleAsyncTaskExecutor TaskExecutor implementation that fires up a new Thread for each task, executing it asynchronously.

Supports limiting concurrent threads through the “concurrencyLimit” bean property. By default, the number of concurrent threads is unlimited.

NOTE: This implementation does not reuse threads! Consider a thread-pooling TaskExecutor implementation instead, in particular for executing a large number of short-lived tasks.

import java.util.HashMap;
import java.util.Map;

import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

import com.example.domain.Customer;
import com.example.mapper.CustomerRowMapper;

@Configuration
public class JobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Autowired
private DataSource dataSource;

@Bean
public JdbcPagingItemReader<Customer> pagingItemReader(){
JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
reader.setDataSource(this.dataSource);
reader.setFetchSize(1000);
reader.setRowMapper(new CustomerRowMapper());

Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("id", Order.ASCENDING);

MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("select id, firstName, lastName, birthdate");
queryProvider.setFromClause("from customer");
queryProvider.setSortKeys(sortKeys);

reader.setQueryProvider(queryProvider);

return reader;
}

@Bean
public JdbcBatchItemWriter<Customer> customerItemWriter(){
JdbcBatchItemWriter<Customer> writer = new JdbcBatchItemWriter<>();
writer.setDataSource(this.dataSource);
writer.setSql("INSERT INTO NEW_CUSTOMER VALUES (:id, :firstName, :lastName, :birthdate)");
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
writer.afterPropertiesSet();

return writer;
}

@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Customer, Customer>chunk(1000)
.reader(pagingItemReader())
.writer(customerItemWriter())
.taskExecutor(new SimpleAsyncTaskExecutor())
.build();
}

@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(step1())
.build();
}
}

MultithreadedStepApplication.java

import java.util.Date;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@EnableBatchProcessing
public class MultithreadedStepApplication implements CommandLineRunner{
@Autowired
private JobLauncher jobLauncher;

@Autowired
private Job job;

public static void main(String[] args) {
SpringApplication.run(MultithreadedStepApplication.class, args);
}

@Override
public void run(String... args) throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addDate("date", new Date())
.addLong("time",System.currentTimeMillis()).toJobParameters();

JobExecution execution = jobLauncher.run(job, jobParameters);
System.out.println("STATUS :: "+execution.getStatus());
}
}

application.properties

# MYSQL DB Details
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/test
spring.datasource.username=root
spring.datasource.password=root
spring.datasource.platform=mysql
spring.datasource.continue-on-error=false

# SPRING BATCH (BatchProperties)
# Database schema initialization mode.
spring.batch.initialize-schema=always

spring.batch.job.enabled=false

#spring.batch.schema=classpath:schema-mysql.sql

#spring.datasource.schema=schema-mysql.sql

#Ref: https://stackoverflow.com/questions/38040572/spring-boot-loading-initial-data
#spring.datasource.data=classpath:data-mysql.sql

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store