Kafka — Consuming with Consumer Groups

Prateek
3 min readMay 9, 2022

--

In this example, we’ll see how to consume the same message by different consumer groups (Ex: In this example using cg-dashboard & cg-notification), although if one consumer consumes slowly compared to other consumer, still they will not block each other and consumes the data.

Producer.java

kafka-topics --bootstrap-server localhost:9092 --create --topic t-commodity --partitions 1 --replication-factor 1 
Created topic t-commodity.

Commodity.java

@NoArgsConstructor
@Data
@Builder
@AllArgsConstructor
public class Commodity {
private String name;
private double price;
private String measurement;
private long timestamp;
}

CommodityProducer.java

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import com.course.kafka.entity.Commodity;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

@Service
public class CommodityProducer {

@Autowired
private ObjectMapper objectMapper;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void sendMessage(Commodity commodity) throws JsonProcessingException {
var json = objectMapper.writeValueAsString(commodity);
kafkaTemplate.send("t-commodity", commodity.getName(), json);
}
}

CommodityScheduler.java

package com.course.kafka.scheduler;
import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpMethod;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

import com.course.kafka.entity.Commodity;
import com.course.kafka.producer.CommodityProducer;
import com.fasterxml.jackson.core.JsonProcessingException;

@Service
public class CommodityScheduler {

private RestTemplate restTemplate = new RestTemplate();

@Autowired
private CommodityProducer producer;

@Scheduled(fixedRate = 5000)
public void fetchCommodities() {
var commodities = restTemplate.exchange("http://localhost:8080/api/commodity/v1/all", HttpMethod.GET, null,
new ParameterizedTypeReference<List<Commodity>>() {
}).getBody();

commodities.forEach(t -> {
try {
producer.sendMessage(t);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
});
}
}

MainApp.java

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling
public class KafkaProducerApplication implements CommandLineRunner {

public static void main(String[] args) {
SpringApplication.run(KafkaProducerApplication.class, args);
}

@Override
public void run(String... args) throws Exception {

}
}
kafka-console-consumer --bootstrap-server localhost:9092 --offset earliest --partition 0 --topic t-commodity{"name":"gold","price":1970.8789217218837,"measurement":"ounce","timestamp":1652068332760}
{"name":"copper","price":6124.165554788417,"measurement":"tonne","timestamp":1652068332760}
{"name":"gold","price":1944.5713002477069,"measurement":"ounce","timestamp":1652068337677}
{"name":"copper","price":5820.69940231883,"measurement":"tonne","timestamp":1652068337677}
{"name":"gold","price":1942.1452025710933,"measurement":"ounce","timestamp":1652068342670}
{"name":"copper","price":5830.626614870821,"measurement":"tonne","timestamp":1652068342670}
{"name":"gold","price":1930.2488113784473,"measurement":"ounce","timestamp":1652068347666}
{"name":"copper","price":5985.099199429845,"measurement":"tonne","timestamp":1652068347666}
{"name":"gold","price":1965.8799049496001,"measurement":"ounce","timestamp":1652068352668}
{"name":"copper","price":5858.765925722519,"measurement":"tonne","timestamp":1652068352668}
{"name":"gold","price":1915.1508110899467,"measurement":"ounce","timestamp":1652068357667}
{"name":"copper","price":6021.334955916558,"measurement":"tonne","timestamp":1652068357667}
{"name":"gold","price":2005.3076455304756,"measurement":"ounce","timestamp":1652068362667}
{"name":"copper","price":6172.8086777391545,"measurement":"tonne","timestamp":1652068362667}
{"name":"gold","price":2001.1089797382185,"measurement":"ounce","timestamp":1652068367664}
{"name":"copper","price":5890.863049442963,"measurement":"tonne","timestamp":1652068367665}
{"name":"gold","price":2033.466758211492,"measurement":"ounce","timestamp":1652068372665}
{"name":"copper","price":5831.853386164544,"measurement":"tonne","timestamp":1652068372665}
{"name":"gold","price":2133.249910055277,"measurement":"ounce","timestamp":1652068377667}
{"name":"copper","price":5959.785908788123,"measurement":"tonne","timestamp":1652068377667}
{"name":"gold","price":2191.9634905365674,"measurement":"ounce","timestamp":1652068382665}
{"name":"copper","price":6029.403962620173,"measurement":"tonne","timestamp":1652068382665}

Consumer

application.yml

server:
port: 8081

spring:
kafka:
consumer:
group-id: default-spring-consumer
auto-offset-reset: earliest
jackson:
date-format: yyyy-MMM-dd

CommodityDashboard.java

import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import com.course.kafka.entity.Commodity;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;

@Service
public class CommodityDashboardConsumer {

private static final Logger LOG = LoggerFactory.getLogger(CommodityDashboardConsumer.class);

@Autowired
private ObjectMapper objectMapper;

@KafkaListener(topics = "t-commodity", groupId = "cg-dashboard")
public void consume(String message) throws JsonMappingException, JsonProcessingException, InterruptedException {
var commodity = objectMapper.readValue(message, Commodity.class);

var randomDelayMillis = ThreadLocalRandom.current().nextLong(500, 2000);
TimeUnit.MILLISECONDS.sleep(randomDelayMillis);

LOG.info("Dashboard logic for : {}", commodity);
}
}

CommodityNotificationConsumer.java

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import com.course.kafka.entity.Commodity;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;

@Service
public class CommodityNotificationConsumer {

private static final Logger LOG = LoggerFactory.getLogger(CommodityNotificationConsumer.class);

@Autowired
private ObjectMapper objectMapper;

@KafkaListener(topics = "t-commodity", groupId = "cg-notification")
public void consume(String message) throws JsonMappingException, JsonProcessingException {
var commodity = objectMapper.readValue(message, Commodity.class);
LOG.info("Notification logic for : {}", commodity);
}
}

Commodity.java class will remain the same. Note — Even if we put a lag in consume, Thread.sleep(ThreadLocalRandom.current().nextLong(500, 1000)); still both the consumer do not block each other and works fine

kafka-consumer-groups --bootstrap-server localhost:9092 --group cg-dashboard --describeGROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                  HOST            CLIENT-ID
cg-dashboard t-commodity 0 42 42 0 consumer-cg-dashboard-1-9f6a1ffe-a974-4afa-9c56-1d7f3b0dc873 /172.21.0.1 consumer-cg-dashboard-1

kafka-consumer-groups --bootstrap-server localhost:9092 --group cg-notification --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
cg-notification t-commodity 0 42 42 0 consumer-cg-notification-2-f7a2d487-bec8-4ba5-be96-a15c301fccc6 /172.21.0.1 consumer-cg-notification-2

--

--