Spring Batch — Scaling Demos

Prateek
6 min readJun 21, 2021

In this tutorial, we’ll see how to make the use of Single JVM to scale the application. There are three ways of doing it

  1. MultiThreaded Job
  2. Async Processor Job
  3. Paralle Steps Job

We’ll see code one by one for all these scaling demos

  1. Async

pom.xml — Main dependency needed for the application to run.

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-integration</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
<!-- <version>5.0.0.RC3</version> -->
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

MainApp.java

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SingleJvmDemosApplication {

public static void main(String[] args) {
SpringApplication.run(SingleJvmDemosApplication.class, args);
}

}

AsyncProcessorJobApplication.java

AsyncItemProcessor — An ItemProcessor that delegates to a nested processor and in the background. To allow for background processing the return value from the processor is a Future which needs to be unpacked before the item can be used by a client. Because the Future is typically unwrapped in the ItemWriter, there are lifecycle and stats limitations (since the framework doesn’t know what the result of the processor is). While not an exhaustive list, things like StepExecution.filterCount will not reflect the number of filtered items and org.springframework.batch.core.ItemProcessListener.onProcessError(Object, Exception) will not be called.

AsyncItemWriter — In the processing of the Futures passed, nulls are not passed to the delegate since they are considered filtered out by the AsyncItemProcessor’s delegated org.springframework.batch.item.ItemProcessor. If the unwrapping of the Future results in an ExecutionException, that will be unwrapped and the cause will be thrown.
Params:
items — Futures to be unwrapped and passed to the delegate
Throws:
Exception — The exception returned by the Future if one was thrown

import javax.sql.DataSource;

import io.spring.batch.scalingdemos.domain.Transaction;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.integration.async.AsyncItemProcessor;
import org.springframework.batch.integration.async.AsyncItemWriter;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.Resource;
import org.springframework.core.task.SimpleAsyncTaskExecutor;


@EnableBatchProcessing
@SpringBootApplication
public class AsyncProcessorJobApplication {

@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Bean
@StepScope
public FlatFileItemReader<Transaction> fileTransactionReader(
@Value("#{jobParameters['inputFlatFile']}") Resource resource) {

return new FlatFileItemReaderBuilder<Transaction>()
.saveState(false)
.resource(resource)
.delimited()
.names(new String[] {"account", "amount", "timestamp"})
.fieldSetMapper(fieldSet -> {
Transaction transaction = new Transaction();

transaction.setAccount(fieldSet.readString("account"));
transaction.setAmount(fieldSet.readBigDecimal("amount"));
transaction.setTimestamp(fieldSet.readDate("timestamp", "yyyy-MM-dd HH:mm:ss"));

return transaction;
})
.build();
}

@Bean
@StepScope
public JdbcBatchItemWriter<Transaction> writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Transaction>()
.dataSource(dataSource)
.beanMapped()
.sql("INSERT INTO TRANSACTION (ACCOUNT, AMOUNT, TIMESTAMP) VALUES (:account, :amount, :timestamp)")
.build();
}

@Bean
public AsyncItemProcessor<Transaction, Transaction> asyncItemProcessor() {
AsyncItemProcessor<Transaction, Transaction> processor = new AsyncItemProcessor<>();
processor.setDelegate(processor());
processor.setTaskExecutor(new SimpleAsyncTaskExecutor());
return processor;
}

@Bean
public AsyncItemWriter<Transaction> asyncItemWriter() {
AsyncItemWriter<Transaction> writer = new AsyncItemWriter<>();
writer.setDelegate(writer(null));
return writer;
}

@Bean
public ItemProcessor<Transaction, Transaction> processor() {
return (transaction) -> {
Thread.sleep(5);
return transaction;
};
}

@Bean
public Job asyncJob() {
return this.jobBuilderFactory.get("asyncJob")
.start(step1async())
.build();
}

// @Bean
// public Job job1() {
// return this.jobBuilderFactory.get("job1")
// .start(step1())
// .build();
// }

@SuppressWarnings({ "unchecked", "rawtypes" })
@Bean
public Step step1async() {
return this.stepBuilderFactory.get("step1async")
.<Transaction, Transaction>chunk(100)
.reader(fileTransactionReader(null))
.processor((ItemProcessor) asyncItemProcessor())
.writer(asyncItemWriter())
.build();
}
//
// @Bean
// public Step step1() {
// return this.stepBuilderFactory.get("step1")
// .<Transaction, Transaction>chunk(100)
// .reader(fileTransactionReader(null))
// .processor(processor())
// .writer(writer(null))
// .build();
// }
//
public static void main(String[] args) {
String [] newArgs = new String[] {"inputFlatFile=/data/csv/bigtransactions.csv"};

SpringApplication.run(AsyncProcessorJobApplication.class, newArgs);
}
}

Transaction.java

@XmlRootElement(name="transaction")
public class Transaction {

private String account;

private Date timestamp;

private BigDecimal amount;

@XmlJavaTypeAdapter(JaxbDateSerializer.class)
public Date getTimestamp() {
return timestamp;
}

public void setTimestamp(Date timestamp) {
this.timestamp = timestamp;
}

public BigDecimal getAmount() {
return amount;
}

public void setAmount(BigDecimal amount) {
this.amount = amount;
}

public String getAccount() {
return account;
}

public void setAccount(String account) {
this.account = account;
}
}

JavaxbDateSerializer.java

import java.text.SimpleDateFormat;
import java.util.Date;
import javax.xml.bind.annotation.adapters.XmlAdapter;

public class JaxbDateSerializer extends XmlAdapter<String, Date> {

private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

@Override
public String marshal(Date date) throws Exception {
return dateFormat.format(date);
}

@Override
public Date unmarshal(String date) throws Exception {
return dateFormat.parse(date);
}
}

— — — — — — — — — — — — — — — — — — — — — — — — — — — — -

MultiThrededJobApplication

ThreadPoolTaskExecuror — JavaBean that allows for configuring a ThreadPoolExecutor in bean style (through its “corePoolSize”, “maxPoolSize”, “keepAliveSeconds”, “queueCapacity” properties) and exposing it as a Spring org.springframework.core.task.TaskExecutor. This class is also well suited for management and monitoring (e.g. through JMX), providing several useful attributes: “corePoolSize”, “maxPoolSize”, “keepAliveSeconds” (all supporting updates at runtime); “poolSize”, “activeCount” (for introspection only).
The default configuration is a core pool size of 1, with unlimited max pool size and unlimited queue capacity. This is roughly equivalent to java.util.concurrent.Executors.newSingleThreadExecutor(), sharing a single thread for all tasks. Setting “queueCapacity” to 0 mimics java.util.concurrent.Executors.newCachedThreadPool(), with immediate scaling of threads in the pool to a potentially very high number. Consider also setting a “maxPoolSize” at that point, as well as possibly a higher “corePoolSize” (see also the “allowCoreThreadTimeOut” mode of scaling).
NOTE: This class implements Spring’s org.springframework.core.task.TaskExecutor interface as well as the Executor interface, with the former being the primary interface, the other just serving as secondary convenience. For this reason, the exception handling follows the TaskExecutor contract rather than the Executor contract, in particular regarding the TaskRejectedException.
For an alternative, you may set up a ThreadPoolExecutor instance directly using constructor injection, or use a factory method definition that points to the java.util.concurrent.Executors class. To expose such a raw Executor as a Spring org.springframework.core.task.TaskExecutor, simply wrap it with a ConcurrentTaskExecutor adapter.

import javax.sql.DataSource;

import io.spring.batch.scalingdemos.domain.Transaction;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.Resource;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;


@EnableBatchProcessing
@SpringBootApplication
public class MultithreadedJobApplication {

@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Bean
@StepScope

public FlatFileItemReader<Transaction> fileTransactionReader(@Value("#{jobParameters['inputFlatFile']}") Resource resource) {

return new FlatFileItemReaderBuilder<Transaction>()
.saveState(false)
.resource(resource)
.delimited()
.names(new String[] {"account", "amount", "timestamp"})
.fieldSetMapper(fieldSet -> {
Transaction transaction = new Transaction();
transaction.setAccount(fieldSet.readString("account"));
transaction.setAmount(fieldSet.readBigDecimal("amount"));
transaction.setTimestamp(fieldSet.readDate("timestamp", "yyyy-MM-dd HH:mm:ss"));

return transaction;
})
.build();
}

@Bean
@StepScope

public JdbcBatchItemWriter<Transaction> writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Transaction>()
.dataSource(dataSource)
.beanMapped()
.sql("INSERT INTO TRANSACTION (ACCOUNT, AMOUNT, TIMESTAMP) VALUES (:account, :amount, :timestamp)")
.build();
}

@Bean
public Job multithreadedJob() {
return this.jobBuilderFactory.get("multithreadedJob")
.start(step1())
.build();
}

@Bean
public Step step1() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(4);
taskExecutor.setMaxPoolSize(4);
taskExecutor.afterPropertiesSet();

return this.stepBuilderFactory.get("step1")
.<Transaction, Transaction>chunk(100)
.reader(fileTransactionReader(null))
.writer(writer(null))
.taskExecutor(taskExecutor)
.build();
}

public static void main(String[] args) {
String [] newArgs = new String[] {"inputFlatFile=/data/csv/bigtransactions.csv"};

SpringApplication.run(MultithreadedJobApplication.class, newArgs);
}
}

— — — — — — — — — — — — — — — — — — — — — — — — — — — — — —

3. ParallelJobStepApplication

FlowBuilder — A builder for a flow of steps that can be executed as a job or as part of a job. Steps can be linked together with conditional transitions that depend on the exit status of the previous step.

StaxEventItemReader — tem reader for reading XML input based on StAX. It extracts fragments from the input XML document which correspond to records for processing. The fragments are wrapped with StartDocument and EndDocument events so that the fragments can be further processed like standalone XML documents. The implementation is not thread-safe.

JdbcBatchItemWriter — ItemWriter that uses the batching features from NamedParameterJdbcTemplate to execute a batch of statements for all items provided.
The user must provide an SQL query and a special callback for either of ItemPreparedStatementSetter or ItemSqlParameterSourceProvider. You can use either named parameters or the traditional ‘?’ placeholders. If you use the named parameter support then you should provide a ItemSqlParameterSourceProvider, otherwise you should provide a ItemPreparedStatementSetter. This callback would be responsible for mapping the item to the parameters needed to execute the SQL statement. It is expected that write(List) is called inside a transaction. The writer is thread-safe after its properties are set (normal singleton behavior), so it can be used to write in multiple concurrent transactions.

JdbcBatchItemWriterimport javax.sql.DataSource;

import io.spring.batch.scalingdemos.domain.Transaction;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.xml.StaxEventItemReader;
import org.springframework.batch.item.xml.builder.StaxEventItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.Resource;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.oxm.jaxb.Jaxb2Marshaller;


@EnableBatchProcessing
@SpringBootApplication
public class ParallelStepsJobApplication {

@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Bean
public Job parallelStepsJob() {
Flow secondFlow = new FlowBuilder<Flow>("secondFlow")
.start(step2())
.build();

Flow parallelFlow = new FlowBuilder<Flow>("parallelFlow")
.start(step1())
.split(new SimpleAsyncTaskExecutor())
.add(secondFlow)
.build();

return this.jobBuilderFactory.get("parallelStepsJob")
.start(parallelFlow)
.end()
.build();
}

// @Bean
// public Job sequentialStepsJob() {
// return this.jobBuilderFactory.get("sequentialStepsJob")
// .start(step1())
// .next(step2())
// .build();
// }

@Bean
@StepScope

public FlatFileItemReader<Transaction> fileTransactionReader(@Value("#{jobParameters['inputFlatFile']}") Resource resource) {

return new FlatFileItemReaderBuilder<Transaction>()
.name("flatFileTransactionReader")
.resource(resource)
.delimited()
.names(new String[] {"account", "amount", "timestamp"})
.fieldSetMapper(fieldSet -> {
Transaction transaction = new Transaction();

transaction.setAccount(fieldSet.readString("account"));
transaction.setAmount(fieldSet.readBigDecimal("amount"));
transaction.setTimestamp(fieldSet.readDate("timestamp", "yyyy-MM-dd HH:mm:ss"));

return transaction;
})
.build();
}

@Bean
@StepScope
public StaxEventItemReader<Transaction> xmlTransactionReader(@Value("#{jobParameters['inputXmlFile']}") Resource resource) {
Jaxb2Marshaller unmarshaller = new Jaxb2Marshaller();
unmarshaller.setClassesToBeBound(Transaction.class);

return new StaxEventItemReaderBuilder<Transaction>()
.name("xmlFileTransactionReader")
.resource(resource)
.addFragmentRootElements("transaction")
.unmarshaller(unmarshaller)
.build();
}

@Bean
@StepScope
public JdbcBatchItemWriter<Transaction> writer(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Transaction>()
.dataSource(dataSource)
.beanMapped()
.sql("INSERT INTO TRANSACTION (ACCOUNT, AMOUNT, TIMESTAMP) VALUES (:account, :amount, :timestamp)")
.build();
}

@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<Transaction, Transaction>chunk(100)
.reader(xmlTransactionReader(null))
.writer(writer(null))
.build();
}

@Bean
public Step step2() {
return this.stepBuilderFactory.get("step2")
.<Transaction, Transaction>chunk(100)
.reader(fileTransactionReader(null))
.writer(writer(null))
.build();
}

public static void main(String[] args) {
String [] newArgs = new String[] {"inputFlatFile=/data/csv/bigtransactions.csv",
"inputXmlFile=/data/xml/bigtransactions.xml"};

SpringApplication.run(ParallelStepsJobApplication.class, newArgs);
}
}

--

--