DSL, stateful transformations, reduce()

Prateek
3 min readJun 23, 2023

Demonstrates how to use `reduce` to sum numbers

Create Topics

kafka-topics --create --topic numbers-topic  --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
kafka-topics --create --topic sum-of-odd-numbers-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

Start Consumer

kafka-console-consumer --topic sum-of-odd-numbers-topic --from-beginning --bootstrap-server localhost:9092 --property value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer

AppConfigs.java

class AppConfigs {
final static String applicationID = "numbers-topic";
final static String bootstrapServers = "localhost:9092";
final static String inputTopic = "numbers-topic";
final static String outputTopic = "sum-of-odd-numbers-topic";
final static String clientConfigId = "sum-lambda-example-client";
}

SumLambdaExample.java

package com.exmple;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;

import java.util.Properties;

public class SumLambdaExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, AppConfigs.applicationID);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfigs.bootstrapServers);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
props.put(StreamsConfig.CLIENT_ID_CONFIG, AppConfigs.clientConfigId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");

// Records should be flushed every 10 seconds. This is less than the default in order to keep this example interactive.
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);

final StreamsBuilder builder = new StreamsBuilder();
final KStream<Void, Integer> input = builder.stream(AppConfigs.inputTopic);

final KTable<Integer, Integer> sumOfOddNumbers = input
// We are only interested in odd numbers.
.filter((k, v) -> v % 2 != 0)
// We want to compute the total sum across ALL numbers, so we must re-key all records to the
// same key. This re-keying is required because in Kafka Streams a data record is always a
// key-value pair, and KStream aggregations such as `reduce` operate on a per-key basis.
// The actual new key (here: `1`) we pick here doesn't matter as long it is the same across
// all records.
.selectKey((k, v) -> 1)
// no need to specify explicit serdes because the resulting key and value types match our default serde settings
.groupByKey()
// Add the numbers to compute the sum.
.reduce(Integer::sum);

sumOfOddNumbers.toStream().to(AppConfigs.outputTopic);

Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
streams.cleanUp();
streams.start();

// Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}

Load Some Data

package com.exmple;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.VoidSerializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.stream.IntStream;

public class SumLambdaExampleDriver {

public static void main(String[] args) {
produceInput(AppConfigs.bootstrapServers);
consumeOutput(AppConfigs.bootstrapServers);
}

// comsumer of the data
private static void consumeOutput(final String bootstrapServers) {
final Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "sum-lambda-example-consumer");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

final KafkaConsumer<Integer, Integer> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singleton(AppConfigs.outputTopic));
while (true) {
final ConsumerRecords<Integer, Integer> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));

for (final ConsumerRecord<Integer, Integer> record : records) {
System.out.println("Current sum of odd numbers is:" + record.value());
}
}
}

// Producer
private static void produceInput(final String bootstrapServers) {
final Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, VoidSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);

final KafkaProducer<Void, Integer> producer = new KafkaProducer<>(props);

IntStream.range(0, 150)
.mapToObj(val -> new ProducerRecord<>(AppConfigs.inputTopic, (Void) null, val))
.forEach(producer::send);

producer.flush();
}
}

Start both the apps and on console consumer you should be able to see

kafka-console-consumer --topic sum-of-odd-numbers-topic --from-beginning --bootstrap-server localhost:9092 --property value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
5625

--

--