Kafka — Producing messages with key

In this example, we’ll produce a messages with key.

% kafka-topics --bootstrap-server localhost:9092 --create --topic t-multi-partitions  --partitions 3 --replication-factor 1
Created topic t-multi-partitions.
% kafka-topics --bootstrap-server localhost:9092 --describe --topic t-multi-partitions
Topic: t-multi-partitions TopicId: 6XXd3WlPSuqSq9NZOx7MYQ PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: t-multi-partitions Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Offline:
Topic: t-multi-partitions Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Offline:
Topic: t-multi-partitions Partition: 2 Leader: 1 Replicas: 1 Isr: 1 Offline:


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

public class kafkaKeyProducer {

private KafkaTemplate<String, String> kafkaTemplate;

public void send(String key, String value) {
kafkaTemplate.send("t-multi-partitions", key, value);


package com.course.kafka;

import java.util.Iterator;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

import com.course.kafka.producer.kafkaKeyProducer;

public class KafkaProducerApplication implements CommandLineRunner {

private kafkaKeyProducer kafkaKeyProducer;

public static void main(String[] args) {
SpringApplication.run(KafkaProducerApplication.class, args);

public void run(String... args) throws Exception {
for (int i = 0; i < 50; i++) {
var key = "key-" + (i % 4);
var data = "data " + i + " with key " + key;
kafkaKeyProducer.send(key, data);

Console Output — Partition-0

kafka-console-consumer --bootstrap-server localhost:9092 --topic t-multi-partitions --offset earliest --partition 0
data 1 with key key-1
data 5 with key key-1
data 9 with key key-1
data 13 with key key-1
data 17 with key key-1
data 21 with key key-1
data 25 with key key-1
data 29 with key key-1
data 33 with key key-1
data 37 with key key-1
data 41 with key key-1
data 45 with key key-1
data 49 with key key-1


kafka-console-consumer --bootstrap-server localhost:9092 --topic t-multi-partitions --offset earliest --partition 1
data 0 with key key-0
data 4 with key key-0
data 8 with key key-0
data 12 with key key-0
data 16 with key key-0
data 20 with key key-0
data 24 with key key-0
data 28 with key key-0
data 32 with key key-0
data 36 with key key-0
data 40 with key key-0
data 44 with key key-0
data 48 with key key-0


kafka-console-consumer --bootstrap-server localhost:9092 --topic t-multi-partitions --offset earliest --partition 2
data 2 with key key-2
data 3 with key key-3
data 6 with key key-2
data 7 with key key-3
data 10 with key key-2
data 11 with key key-3
data 14 with key key-2
data 15 with key key-3
data 18 with key key-2
data 19 with key key-3
data 22 with key key-2
data 23 with key key-3
data 26 with key key-2
data 27 with key key-3
data 30 with key key-2
data 31 with key key-3
data 34 with key key-2
data 35 with key key-3
data 38 with key key-2
data 39 with key key-3
data 42 with key key-2
data 43 with key key-3
data 46 with key key-2
data 47 with key key-3

— — — — — — — — — — — — -



package com.course.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

public class KafkaKeyConsumer {
private static final Logger log = LoggerFactory.getLogger(KafkaKeyConsumer.class);

@KafkaListener(topics = "t-multi-partitions")
public void consume(ConsumerRecord<String, String> message) throws InterruptedException {
log.info("Key : {}, Partition : {}, Message : {}", message.key(), message.partition(), message.value());

Note — This shows only 1 consumer processing data for all partitions, screen shot below.

@KafkaListener(topics = "t_multi_partitions", concurrency = "2") - concurrency = "2" that many consumers will be created.

@KafkaListener(topics = "t_multi_partitions", concurrency = "3") - concurrency = "3" that many consumers will be created.

Now 3 consumers are assigned to 3 partitions, 1 consumer is assigned to unique partition

@KafkaListener(topics = "t_multi_partitions", concurrency = "4") - concurrency = "4" that many consumers will be created.

Now 3 consumers are working and 1 is idle, hence no. of partitions = no. of consumers.




Java Developer and enthusiast

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Kuma Inu Update


Convert SavedModel to TFLite Model with Signatures

Installing MS SQL Server on KALI LINUX

License terms

Automate your Google Sheet email report with Google Script

spreadsheet report

Self-Host Your Azure Pipeline Agents in Kubernetes and Scale Them On Demand

The Preferred Backend Frameworks for App Developers

Gitkraken Introduction

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store


Java Developer and enthusiast

More from Medium

Kafka — Consuming with Consumer Groups

Getting started with Caffeine Cache in Spring Boot

Spring Boot Event Sourcing Via Kafka

Quarkus — Subatomic Java Review: Apache Kafka Producer & Consumer