Apache Kafka — DSL, aggregation, stateful

Prateek
2 min readJan 8, 2023

--

This example demonstrates, using the high-level KStream DSL, how to implement the WordCount program that computes a simple word occurrence histogram from an input text.

This example uses lambda expressions and thus works with Java 8+ only. In this example, the input stream reads from a topic named “streams-plaintext-input”, where the values of messages represent lines of text; and the histogram output is written to topic “streams-wordcount-output”, where each record is an updated count of a single word, i.e. {@code word (String) -> currentCount (Long)}.

Note: Before running this example you must 1) create the source topic (e.g. via {@code kafka-topics — create …}), then 2) start this example and 3) write some data to the source topic (e.g. via {@code kafka-console-producer}). * Otherwise you won’t see any data arriving in the output topic.

MainApp.java

package com.example;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;

import java.util.Arrays;
import java.util.Properties;
import java.util.regex.Pattern;

public class WordCountDemo {
public static void main(String[] args) {
Properties prop = new Properties();
prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-lambda-example");
prop.put(StreamsConfig.CLIENT_ID_CONFIG, "wordcount-lambda-example-client");
prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
prop.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/test");

final Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("streams-plaintext-input");
KTable<String, Long> wordCounts = textLines.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
.groupBy((key, value) -> value)
.count();
wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), prop);
kafkaStreams.start();

Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
}
}

Please create the Topic

kafka-topics --bootstrap-server localhost:9092 --create --topic streams-plaintext-input --partitions 1 --replication-factor 1

kafka-topics --bootstrap-server localhost:9092 --create --topic streams-wordcount-output --partitions 1 --replication-factor 1

Run the MainApp and post some messages

kafka-console-producer --broker-list localhost:9092 --topic streams-plaintext-input
>hello kafka streams
>all streams lead to kafka
>join kafka summit
>

Consumer

kafka-console-consumer --topic streams-wordcount-output --from-beginning --bootstrap-server localhost:9092 --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
hello 1
kafka 1
streams 1
all 1
streams 2
lead 1
to 1
kafka 2
join 1
kafka 3
summit 1

--

--