pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.oreilly.integration</groupId>
<artifactId>spring-integration</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>spring-integration</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.1</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
PrintService.java
package com.oreilly.integration;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
public class PrintService {
public Message<String> print(Message<String> message) {
System.out.println(message.getPayload());
int messageNumber = (int) message.getHeaders().get("messageNumber");
return MessageBuilder.withPayload("Sending a reply for message " + messageNumber).build();
}
}
PrinterGateway.java
package com.oreilly.integration;
import java.util.concurrent.Future;
import org.springframework.messaging.Message;
public interface PrinterGateway {
Future<Message<String>> print(Message<?> message);
}
MainApp.java
package com.oreilly.integration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.ImportResource;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
@SpringBootApplication
@Configuration
@ImportResource("integration-context.xml")
public class SpringIntegrationApplication implements ApplicationRunner {
@Autowired
private PrinterGateway gateway;
public static void main(String[] args) {
SpringApplication.run(SpringIntegrationApplication.class, args);
}
@Override
public void run(ApplicationArguments arg0) throws InterruptedException, ExecutionException {
List<Future<Message<String>>> futures = new ArrayList<Future<Message<String>>>();
for (int x = 0; x < 10; x++) {
Message<String> message = MessageBuilder
.withPayload("Printing message payload for " + x)
.setHeader("messageNumber", x)
.build();
System.out.println("Sending message " + x);
futures.add(this.gateway.print(message));
}
for (Future<Message<String>> future : futures) {
System.out.println(future.get().getPayload());
}
}
}
integration-context.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.2.xsd">
<int:gateway service-interface="com.oreilly.integration.PrinterGateway"
default-request-channel="inputChannel" />
<int:channel id="inputChannel">
<int:queue capacity="10"/>
</int:channel>
<bean class="com.oreilly.integration.PrintService" id="printService" />
<int:service-activator ref="printService" method="print"
input-channel="inputChannel">
<int:poller fixed-rate="5" time-unit="SECONDS" max-messages-per-poll="2"/>
</int:service-activator>
</beans>
Based output — messages are processed Async
2021–12–12 22:33:23.878 INFO 22572 — — [ main] c.o.i.SpringIntegrationApplication : Started SpringIntegrationApplication in 1.395 seconds (JVM running for 2.449)
Sending message 0
Sending message 1
Sending message 2
Sending message 3
Sending message 4
Sending message 5
Sending message 6
Sending message 7
Sending message 8
Sending message 9
2021–12–12 22:33:23.888 INFO 22572 — — [ scheduling-1] o.s.i.h.s.MessagingMethodInvokerHelper : Overriding default instance of MessageHandlerMethodFactory with provided one.
Printing message payload for 1
Printing message payload for 7
Printing message payload for 8
Printing message payload for 0
Sending a reply for message 0
Sending a reply for message 1
Printing message payload for 3
Printing message payload for 6
Printing message payload for 2
Printing message payload for 5
Sending a reply for message 2
Sending a reply for message 3
Printing message payload for 4
Printing message payload for 9
Sending a reply for message 4
Sending a reply for message 5
Sending a reply for message 6
Sending a reply for message 7
Sending a reply for message 8
Sending a reply for message 9