Spring Cloud Stream — Demo2

Prateek
2 min readAug 29, 2022

--

In this example, will give an overview about Spring Cloud Stream, followed by one example on how simple it is to write Kafka Processor using Spring Cloud Stream.

We’re using functional programming way to create topic (do not try in prod) and all spring cloud stream auto-configuration features will work.

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>spring-cloud-stream-process-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-cloud-stream-process-demo</name>
<description>Demo project for Spring Boot</description>

<properties>
<java.version>11</java.version>
<spring-cloud.version>2021.0.3</spring-cloud.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<scope>test</scope>
<classifier>test-binder</classifier>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>

</project>

MainApp.java

package com.example.demo;

import lombok.extern.log4j.Log4j2;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import reactor.core.publisher.Flux;

import java.time.Duration;
import java.util.Arrays;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

@Log4j2
@SpringBootApplication
public class SpringCloudStreamProcessDemoApplication {

public static void main(String[] args) {
SpringApplication.run(SpringCloudStreamProcessDemoApplication.class, args);
}

// producer-out-0
@Bean
public Supplier<Flux<Long>> producer() {
return () -> Flux.interval(Duration.ofSeconds(1)).log();
}

// processor-in-0
// processor-out-0
@Bean
public Function<Flux<Long>, Flux<Long>> processor() {
return longFlux -> longFlux.map(number -> number * number).log();
}

// consumer-in-0
@Bean
public Consumer<Long> consumer() {
return number -> log.info("Squared Number : {}", number);
}
}

application.yml

spring:
cloud:
function:
definition: producer;processor;consumer
stream:
bindings:
producer-out-0:
destination: numbers
processor-in-0:
destination: numbers
processor-out-0:
destination: squares
consumer-in-0:
destination: squares

--

--

Prateek
Prateek

Written by Prateek

Java Developer and enthusiast

No responses yet