Kafka — Producing messages with key

In this example, we’ll produce a messages with key.

% kafka-topics --bootstrap-server localhost:9092 --create --topic t-multi-partitions  --partitions 3 --replication-factor 1
Created topic t-multi-partitions.
% kafka-topics --bootstrap-server localhost:9092 --describe --topic t-multi-partitions
Topic: t-multi-partitions TopicId: 6XXd3WlPSuqSq9NZOx7MYQ PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: t-multi-partitions Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Offline:
Topic: t-multi-partitions Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Offline:
Topic: t-multi-partitions Partition: 2 Leader: 1 Replicas: 1 Isr: 1 Offline:

Producer.java

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class kafkaKeyProducer {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void send(String key, String value) {
kafkaTemplate.send("t-multi-partitions", key, value);
}
}

MainApp.java

package com.course.kafka;

import java.util.Iterator;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

import com.course.kafka.producer.kafkaKeyProducer;

@SpringBootApplication
//@EnableScheduling
public class KafkaProducerApplication implements CommandLineRunner {

@Autowired
private kafkaKeyProducer kafkaKeyProducer;

public static void main(String[] args) {
SpringApplication.run(KafkaProducerApplication.class, args);
}

@Override
public void run(String... args) throws Exception {
for (int i = 0; i < 50; i++) {
var key = "key-" + (i % 4);
var data = "data " + i + " with key " + key;
kafkaKeyProducer.send(key, data);
Thread.sleep(500);
}
}
}

Console Output — Partition-0

kafka-console-consumer --bootstrap-server localhost:9092 --topic t-multi-partitions --offset earliest --partition 0
data 1 with key key-1
data 5 with key key-1
data 9 with key key-1
data 13 with key key-1
data 17 with key key-1
data 21 with key key-1
data 25 with key key-1
data 29 with key key-1
data 33 with key key-1
data 37 with key key-1
data 41 with key key-1
data 45 with key key-1
data 49 with key key-1

Partition-1

kafka-console-consumer --bootstrap-server localhost:9092 --topic t-multi-partitions --offset earliest --partition 1
data 0 with key key-0
data 4 with key key-0
data 8 with key key-0
data 12 with key key-0
data 16 with key key-0
data 20 with key key-0
data 24 with key key-0
data 28 with key key-0
data 32 with key key-0
data 36 with key key-0
data 40 with key key-0
data 44 with key key-0
data 48 with key key-0

Partition-2

kafka-console-consumer --bootstrap-server localhost:9092 --topic t-multi-partitions --offset earliest --partition 2
data 2 with key key-2
data 3 with key key-3
data 6 with key key-2
data 7 with key key-3
data 10 with key key-2
data 11 with key key-3
data 14 with key key-2
data 15 with key key-3
data 18 with key key-2
data 19 with key key-3
data 22 with key key-2
data 23 with key key-3
data 26 with key key-2
data 27 with key key-3
data 30 with key key-2
data 31 with key key-3
data 34 with key key-2
data 35 with key key-3
data 38 with key key-2
data 39 with key key-3
data 42 with key key-2
data 43 with key key-3
data 46 with key key-2
data 47 with key key-3

— — — — — — — — — — — — -

Consume

KafkaKeyConsumer.java

package com.course.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;


@Service
public class KafkaKeyConsumer {
private static final Logger log = LoggerFactory.getLogger(KafkaKeyConsumer.class);

@KafkaListener(topics = "t-multi-partitions")
public void consume(ConsumerRecord<String, String> message) throws InterruptedException {
log.info("Key : {}, Partition : {}, Message : {}", message.key(), message.partition(), message.value());
Thread.sleep(1000);
}
}

Note — This shows only 1 consumer processing data for all partitions, screen shot below.

@KafkaListener(topics = "t_multi_partitions", concurrency = "2") - concurrency = "2" that many consumers will be created.

@KafkaListener(topics = "t_multi_partitions", concurrency = "3") - concurrency = "3" that many consumers will be created.

Now 3 consumers are assigned to 3 partitions, 1 consumer is assigned to unique partition

@KafkaListener(topics = "t_multi_partitions", concurrency = "4") - concurrency = "4" that many consumers will be created.

Now 3 consumers are working and 1 is idle, hence no. of partitions = no. of consumers.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store