In this example, will give an overview about Spring Cloud Stream, followed by one example on how simple it is to write Kafka Processor using Spring Cloud Stream.
We’re using functional programming way to create topic (do not try in prod) and all spring cloud stream auto-configuration features will work.
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>spring-cloud-stream-process-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-cloud-stream-process-demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
<spring-cloud.version>2021.0.3</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<scope>test</scope>
<classifier>test-binder</classifier>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
MainApp.java
package com.example.demo;
import lombok.extern.log4j.Log4j2;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.Arrays;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@Log4j2
@SpringBootApplication
public class SpringCloudStreamProcessDemoApplication {
public static void main(String[] args) {
SpringApplication.run(SpringCloudStreamProcessDemoApplication.class, args);
}
// producer-out-0
@Bean
public Supplier<Flux<Long>> producer() {
return () -> Flux.interval(Duration.ofSeconds(1)).log();
}
// processor-in-0
// processor-out-0
@Bean
public Function<Flux<Long>, Flux<Long>> processor() {
return longFlux -> longFlux.map(number -> number * number).log();
}
// consumer-in-0
@Bean
public Consumer<Long> consumer() {
return number -> log.info("Squared Number : {}", number);
}
}
application.yml
spring:
cloud:
function:
definition: producer;processor;consumer
stream:
bindings:
producer-out-0:
destination: numbers
processor-in-0:
destination: numbers
processor-out-0:
destination: squares
consumer-in-0:
destination: squares