In this example, we will learn how to read data from one table using partitions and write it into another table.
The Job runs on the left-hand side as a sequence of Step instances, and one of the Step instances is labeled as a manager. The workers in this picture are all identical instances of a Step, which could in fact take the place of the manager, resulting in the same outcome for the Job. The workers are typically going to be remote services but could also be local threads of execution. The messages sent by the manager to the workers in this pattern do not need to be durable or have guaranteed delivery. Spring Batch metadata in the JobRepository ensures that each worker is executed once and only once for each Job execution.
The SPI in Spring Batch consists of a special implementation of Step (called the PartitionStep) and two strategy interfaces that need to be implemented for the specific environment. The strategy interfaces are PartitionHandler and StepExecutionSplitter, and their role is shown in the following sequence diagram:
LocalPartitioningApplication — Run this main class as Spring Boot Application.
package com.example;import java.util.Date;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@EnableBatchProcessing
public class LocalPartitioningApplication implements CommandLineRunner{
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job job;
public static void main(String[] args) {
SpringApplication.run(LocalPartitioningApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
JobParameters jobParameters = new JobParametersBuilder()
.addString("JobId", String.valueOf(System.currentTimeMillis()))
.addDate("date", new Date())
.addLong("time",System.currentTimeMillis()).toJobParameters();
JobExecution execution = jobLauncher.run(job, jobParameters);
System.out.println("STATUS :: "+execution.getStatus());
}
}
We implement Partitioner. Central strategy interface for creating input parameters for a partitioned step in the form of ExecutionContext instances. The usual aim is to create a set of distinct input values, e.g. a set of non-overlapping primary key ranges, or a set of unique filenames.
package com.example.configuration;
import java.util.HashMap;
import java.util.Map;
import javax.sql.DataSource;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.jdbc.core.JdbcOperations;
import org.springframework.jdbc.core.JdbcTemplate;
public class ColumnRangePartitioner implements Partitioner {
private JdbcOperations jdbcTemplate;
private String table;
private String column;
public void setTable(String table) {
this.table = table;
}
public void setColumn(String column) {
this.column = column;
}
public void setDataSource(DataSource dataSource) {
jdbcTemplate = new JdbcTemplate(dataSource);
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
int min = jdbcTemplate.queryForObject("SELECT MIN(" + column + ") FROM " + table, Integer.class);
int max = jdbcTemplate.queryForObject("SELECT MAX(" + column + ") FROM " + table, Integer.class);
int targetSize = (max - min) / gridSize + 1;
Map<String, ExecutionContext> result = new HashMap<>();
int number = 0;
int start = min;
int end = start + targetSize - 1;
while (start <= max) {
ExecutionContext value = new ExecutionContext();
result.put("partition" + number, value);
if(end >= max) {
end = max;
}
value.putInt("minValue", start);
value.putInt("maxValue", end);
start += targetSize;
end += targetSize;
number++;
}
return result;
}
}
JobConfiguration
package com.example.configuration;
import java.util.HashMap;
import java.util.Map;
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import com.example.domain.Customer;
import com.example.mapper.CustomerRowMapper;
@Configuration
public class JobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private DataSource dataSource;
@Bean
public ColumnRangePartitioner partitioner() {
ColumnRangePartitioner columnRangePartitioner = new ColumnRangePartitioner();
columnRangePartitioner.setColumn("id");
columnRangePartitioner.setDataSource(dataSource);
columnRangePartitioner.setTable("customer");
return columnRangePartitioner;
}
@Bean
@StepScope
public JdbcPagingItemReader<Customer> pagingItemReader(@Value("#{stepExecutionContext['minValue']}") Long minValue,
@Value("#{stepExecutionContext['maxValue']}") Long maxValue) {
System.out.println("reading " + minValue + " to " + maxValue);
Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("id", Order.ASCENDING);
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("id, firstName, lastName, birthdate");
queryProvider.setFromClause("from customer");
queryProvider.setWhereClause("where id >= " + minValue + " and id < " + maxValue);
queryProvider.setSortKeys(sortKeys);
JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
reader.setDataSource(this.dataSource);
reader.setFetchSize(1000);
reader.setRowMapper(new CustomerRowMapper());
reader.setQueryProvider(queryProvider);
return reader;
}
@Bean
@StepScope
public JdbcBatchItemWriter<Customer> customerItemWriter(){
JdbcBatchItemWriter<Customer> itemWriter = new JdbcBatchItemWriter<>();
itemWriter.setDataSource(dataSource);
itemWriter.setSql("INSERT INTO NEW_CUSTOMER VALUES (:id, :firstName, :lastName, :birthdate)");
itemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
itemWriter.afterPropertiesSet();
return itemWriter;
}
// Master
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.partitioner(slaveStep().getName(), partitioner())
.step(slaveStep())
.gridSize(4)
.taskExecutor(new SimpleAsyncTaskExecutor())
.build();
}
// slave step
@Bean
public Step slaveStep() {
return stepBuilderFactory.get("slaveStep")
.<Customer, Customer>chunk(1000)
.reader(pagingItemReader(null, null))
.writer(customerItemWriter())
.build();
}
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(step1())
.build();
}
}
Domain Customer object
package com.example.domain;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
public class Customer {
private Long id;
private String firstName;
private String lastName;
private String birthdate;}
CustomerRowMapper
package com.example.mapper;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.springframework.jdbc.core.RowMapper;
import com.example.domain.Customer;
public class CustomerRowMapper implements RowMapper<Customer> {
@Override
public Customer mapRow(ResultSet rs, int rowNum) throws SQLException {
return Customer.builder().id(rs.getLong("id")).firstName(rs.getString("firstName"))
.lastName(rs.getString("lastName")).birthdate(rs.getString("birthdate")).build();
}}
application.properties
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/test
spring.datasource.username=root
spring.datasource.password=root
spring.batch.initialize-schema=alwaysspring.batch.job.enabled=false
schema.sql
CREATE TABLE `test`.`customer` (
`id` MEDIUMINT(8) UNSIGNED NOT NULL,
`firstName` VARCHAR(255) NULL,
`lastName` VARCHAR(255) NULL,
`birthdate` VARCHAR(255) NULL,
PRIMARY KEY (`id`)
);
CREATE TABLE `test`.`new_customer` (
`id` MEDIUMINT(8) UNSIGNED NOT NULL,
`firstName` VARCHAR(255) NULL,
`lastName` VARCHAR(255) NULL,
`birthdate` VARCHAR(255) NULL,
PRIMARY KEY (`id`)
);
Here is
2019-12-12 22:29:53.335 INFO 2652 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=job]] launched with the following parameters: [{JobId=1576169993223, date=1576169993223, time=1576169993223}]
2019-12-12 22:29:53.427 INFO 2652 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step1]
reading 6 to 10
reading 16 to 20
reading 11 to 15
reading 1 to 5
2019-12-12 22:29:53.699 INFO 2652 --- [cTaskExecutor-2] o.s.batch.core.step.AbstractStep : Step: [slaveStep:partition3] executed in 148ms
2019-12-12 22:29:53.703 INFO 2652 --- [cTaskExecutor-1] o.s.batch.core.step.AbstractStep : Step: [slaveStep:partition1] executed in 160ms
2019-12-12 22:29:53.711 INFO 2652 --- [cTaskExecutor-3] o.s.batch.core.step.AbstractStep : Step: [slaveStep:partition0] executed in 168ms
2019-12-12 22:29:53.711 INFO 2652 --- [cTaskExecutor-4] o.s.batch.core.step.AbstractStep : Step: [slaveStep:partition2] executed in 164ms
2019-12-12 22:29:53.727 INFO 2652 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step1] executed in 300ms
2019-12-12 22:29:53.747 INFO 2652 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=job]] completed with the following parameters: [{JobId=1576169993223, date=1576169993223, time=1576169993223}] and the following status: [COMPLETED] in 372ms
STATUS :: COMPLETED
2019-12-12 22:29:53.751 INFO 2652 --- [extShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated...
2019-12-12 22:29:53.755 INFO 2652 --- [extShutdownHook] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown completed.
Details
mysql> show tables;
+------------------------------+
| Tables_in_test |
+------------------------------+
| batch_job_execution |
| batch_job_execution_context |
| batch_job_execution_params |
| batch_job_execution_seq |
| batch_job_instance |
| batch_job_seq |
| batch_step_execution |
| batch_step_execution_context |
| batch_step_execution_seq |
| customer |
| new_customer |
+------------------------------+
11 rows in set (0.00 sec)
mysql> select * from customer;
+----+------------+----------+---------------------+
| id | firstName | lastName | birthdate |
+----+------------+----------+---------------------+
| 1 | John | Doe | 10-10-1952 10:10:10 |
| 2 | Amy | Eugene | 05-07-1985 17:10:00 |
| 3 | Laverne | Mann | 11-12-1988 10:10:10 |
| 4 | Janice | Preston | 19-02-1960 10:10:10 |
| 5 | Pauline | Rios | 29-08-1977 10:10:10 |
| 6 | Perry | Burnside | 10-03-1981 10:10:10 |
| 7 | Todd | Kinsey | 14-12-1998 10:10:10 |
| 8 | Jacqueline | Hyde | 20-03-1983 10:10:10 |
| 9 | Rico | Hale | 10-10-2000 10:10:10 |
| 10 | Samuel | Lamm | 11-11-1999 10:10:10 |
| 11 | Robert | Coster | 10-10-1972 10:10:10 |
| 12 | Tamara | Soler | 02-01-1978 10:10:10 |
| 13 | Justin | Kramer | 19-11-1951 10:10:10 |
| 14 | Andrea | Law | 14-10-1959 10:10:10 |
| 15 | Laura | Porter | 12-12-2010 10:10:10 |
| 16 | Michael | Cantu | 11-04-1999 10:10:10 |
| 17 | Andrew | Thomas | 04-05-1967 10:10:10 |
| 18 | Jose | Hannah | 16-09-1950 10:10:10 |
| 19 | Valerie | Hilbert | 13-06-1966 10:10:10 |
| 20 | Patrick | Durham | 12-10-1978 10:10:10 |
+----+------------+----------+---------------------+
20 rows in set (0.00 sec)
mysql>
mysql> select * from new_customer;
+----+------------+----------+---------------------+
| id | firstName | lastName | birthdate |
+----+------------+----------+---------------------+
| 1 | John | Doe | 10-10-1952 10:10:10 |
| 2 | Amy | Eugene | 05-07-1985 17:10:00 |
| 3 | Laverne | Mann | 11-12-1988 10:10:10 |
| 4 | Janice | Preston | 19-02-1960 10:10:10 |
| 6 | Perry | Burnside | 10-03-1981 10:10:10 |
| 7 | Todd | Kinsey | 14-12-1998 10:10:10 |
| 8 | Jacqueline | Hyde | 20-03-1983 10:10:10 |
| 9 | Rico | Hale | 10-10-2000 10:10:10 |
| 11 | Robert | Coster | 10-10-1972 10:10:10 |
| 12 | Tamara | Soler | 02-01-1978 10:10:10 |
| 13 | Justin | Kramer | 19-11-1951 10:10:10 |
| 14 | Andrea | Law | 14-10-1959 10:10:10 |
| 16 | Michael | Cantu | 11-04-1999 10:10:10 |
| 17 | Andrew | Thomas | 04-05-1967 10:10:10 |
| 18 | Jose | Hannah | 16-09-1950 10:10:10 |
| 19 | Valerie | Hilbert | 13-06-1966 10:10:10 |
+----+------------+----------+---------------------+16 rows in set (0.00 sec)