Prateek
2 min readMay 8, 2022

Kafka — Producing and Consuming Json

In this example, we’ll produce and consume the Json using Apache Kafka!

  • To Create the Topic
kafka-topics --bootstrap-server localhost:9092 --create --topic t-employee  --partitions 1  --replication-factor 1
Created topic t-employee.

Producer Code —

application.yml

spring:
jackson:
date-format: yyyy-MMM-dd

JsonConfig.java — This class holds custom ObjectMapper bean.

package com.course.kafka.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;

@Configuration
public class JsonConfig {

@Bean
public ObjectMapper objectMapper() {
var objectMapper = new ObjectMapper();
objectMapper.findAndRegisterModules();
objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
return objectMapper;
}
}

Employee.java

package com.course.kafka.entity;

import java.time.LocalDate;

import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
@JsonNaming(value = PropertyNamingStrategies.SnakeCaseStrategy.class)
public class Employee {
private String employeeId;
private String name;
private LocalDate birthDate;
}

EmployeeJsonProducer.java

package com.course.kafka.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import com.course.kafka.entity.Employee;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

@Service
public class EmployeeJsonProducer {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

@Autowired
private ObjectMapper objectMapper;

public void sendMessage(Employee emp) throws JsonProcessingException {
var json = objectMapper.writeValueAsString(emp);
kafkaTemplate.send("t-employee", json);
}
}

MainApp.java

package com.course.kafka;

import java.time.LocalDate;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import com.course.kafka.entity.Employee;
import com.course.kafka.producer.EmployeeJsonProducer;

@SpringBootApplication
//@EnableScheduling
public class KafkaProducerApplication implements CommandLineRunner {

@Autowired
private EmployeeJsonProducer producer;

public static void main(String[] args) {
SpringApplication.run(KafkaProducerApplication.class, args);
}

@Override
public void run(String... args) throws Exception {
for (int i = 0; i < 5; i++) {
var employee = Employee.builder().employeeId("emp-" + i).name("Employee " + i).birthDate(LocalDate.now()).build();
producer.sendMessage(employee);
}
}
}

Consumer Code —

JsonConfig.java, Employee.java will remain same.

application.yml

server:
port: 8081

spring:
kafka:
consumer:
group-id: default-spring-consumer
auto-offset-reset: earliest
jackson:
date-format: yyyy-MMM-dd

EmployeeJsonConsumer.java

package com.course.kafka.consumer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import com.course.kafka.entity.Employee;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import lombok.extern.slf4j.Slf4j;

@Service
@Slf4j
public class EmployeeJsonConsumer {

@Autowired
private ObjectMapper objectMapper;

@KafkaListener(topics = "t-employee")
public void listen(String message) throws JsonMappingException, JsonProcessingException {
Employee employee = objectMapper.readValue(message, Employee.class);
log.info("Employee is :{}", employee);
}
}
kafka-console-consumer --bootstrap-server localhost:9092 --offset earliest --partition 0 --topic t-employee
{"employee_id":"emp-0","name":"Employee 0","birth_date":"2022-05-08"}
{"employee_id":"emp-1","name":"Employee 1","birth_date":"2022-05-08"}
{"employee_id":"emp-2","name":"Employee 2","birth_date":"2022-05-08"}
{"employee_id":"emp-3","name":"Employee 3","birth_date":"2022-05-08"}
{"employee_id":"emp-4","name":"Employee 4","birth_date":"2022-05-08"}
Prateek
Prateek

Written by Prateek

Java Developer and enthusiast

No responses yet