In Spring Batch, we often need to pass the certain data sets (either fetch from DB or from static resources or lookup data) from one steps to another. The Spring Batch has provided ExecutionContextPromotionListener API to pass data from one step to another step.
I’ve done small POC which shows how to pass data from one Step to another. There are two ways of doing it.
- StepExecutionContext
- ExecutionContextPromotionListener
From a step, you can put data into the StepExecutionContext. Then, with a listener, you can promote data from StepExecutionContext to JobExecutionContext.
This JobExecutionContext is available in all the following steps.
Be-careful: data must be short. These contexts are saved in the JobRepository by serialization and the length is limited (2500 chars if I remember well).
So these contexts are good to share strings or simple values, but not for sharing collections or huge amounts of data.
Sharing huge amounts of data is not the philosophy of Spring Batch. Spring Batch is a set of distinct actions, not a huge Business processing unit.
ReadingObjectItemReader.java
public class ReadingObjectItemReader implements ItemReader<Integer> {
int i = 0;
private List<Integer> items;
@Override
public Integer read() {
if (i >= items.size()) {
return null;
} else {
return items.get(i++);
}
}
// Get the details from cache
@SuppressWarnings("unchecked")
@BeforeStep
public void retrieveSharedData(StepExecution stepExecution) {
JobExecution jobExecution = stepExecution.getJobExecution();
ExecutionContext jobContext = jobExecution.getExecutionContext();
items = (List<Integer>) jobContext.get("items");
}}
MyItemWriter.java
public class MyItemWriter implements ItemWriter<Integer> {
private StepExecution stepExecution;
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
this.stepExecution.getExecutionContext().put("items", new ArrayList<>());
}
@SuppressWarnings("unchecked")
@Override
public void write(List<? extends Integer> items) throws Exception {
List<Integer> itemsList = (List<Integer>) stepExecution.getExecutionContext().get("items");
for (Integer item : items) {
System.out.println("item = " + item);
itemsList.add(item);
}
}}
JobConfiguration.java
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.listener.ExecutionContextPromotionListener;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class JobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public ReadingObjectItemReader readingObjectItemReader() {
return new ReadingObjectItemReader();
}
@Bean
public ItemReader<Integer> itemReader() {
return new ListItemReader<>(Arrays.asList(1, 2, 3, 4));
}
@Bean
public ItemWriter<Integer> itemWriter() {
return new ItemWriter<Integer>() {
private StepExecution stepExecution;
@SuppressWarnings("unchecked")
@Override
public void write(List<? extends Integer> items) {
List<Integer> itemsList = (List<Integer>) stepExecution.getExecutionContext().get("items");
for (Integer item : items) {
System.out.println("item =>> " + item);
itemsList.add(item);
}
}
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
this.stepExecution.getExecutionContext().put("items", new ArrayList<>());
}
};
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Integer, Integer>chunk(2)
.reader(itemReader())
.writer(itemWriter())
.listener(promotionListener())
.build();
}
@Bean
public Step step2() {
return stepBuilderFactory.get("step2")
.<Integer, Integer>chunk(2)
.reader(new ReadingObjectItemReader())
.writer(items -> items.forEach((Consumer<Integer>) integer -> System.out.println("integer ==> " + integer)))
.build();
}
@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[]{"items"});
listener.setStrict(true);
return listener;
}
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(step1())
.next(step2())
.build();
}
}
PassDataFromStepToAnotherStepApplication.java
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@EnableBatchProcessing
@SpringBootApplication
public class PassDataFromStepToAnotherStepApplication {
public static void main(String[] args) {
SpringApplication.run(PassDataFromStepToAnotherStepApplication.class, args);
}}
Console Output
2021-01-03 17:26:56.812 INFO 18700 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting...
2021-01-03 17:26:56.998 INFO 18700 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed.
2021-01-03 17:26:57.110 INFO 18700 --- [ main] o.s.b.c.r.s.JobRepositoryFactoryBean : No database type set, using meta data indicating: H2
2021-01-03 17:26:57.203 INFO 18700 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : No TaskExecutor has been set, defaulting to synchronous executor.
2021-01-03 17:26:57.352 INFO 18700 --- [ main] PassDataFromStepToAnotherStepApplication : Started PassDataFromStepToAnotherStepApplication in 1.681 seconds (JVM running for 2.595)
2021-01-03 17:26:57.354 INFO 18700 --- [ main] o.s.b.a.b.JobLauncherCommandLineRunner : Running default command line with: [--spring.output.ansi.enabled=always]
2021-01-03 17:26:57.405 INFO 18700 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=job]] launched with the following parameters: [{-spring.output.ansi.enabled=always}]
2021-01-03 17:26:57.420 INFO 18700 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step1]
item =>> 1
item =>> 2
item =>> 3
item =>> 4
2021-01-03 17:26:57.456 INFO 18700 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step2]
integer ==> 1
integer ==> 2
integer ==> 3
integer ==> 4
2021-01-03 17:26:57.464 INFO 18700 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=job]] completed with the following parameters: [{-spring.output.ansi.enabled=always}] and the following status: [COMPLETED]
2021-01-03 17:26:57.467 INFO 18700 --- [ Thread-2] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated...
2021-01-03 17:26:57.468 INFO 18700 --- [ Thread-2] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown completed.