Kafka — Producing and Consuming Json
In this example, we’ll produce and consume the Json using Apache Kafka!
- To Create the Topic
kafka-topics --bootstrap-server localhost:9092 --create --topic t-employee --partitions 1 --replication-factor 1
Created topic t-employee.
Producer Code —
application.yml
spring:
jackson:
date-format: yyyy-MMM-dd
JsonConfig.java — This class holds custom ObjectMapper bean.
package com.course.kafka.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
@Configuration
public class JsonConfig {
@Bean
public ObjectMapper objectMapper() {
var objectMapper = new ObjectMapper();
objectMapper.findAndRegisterModules();
objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
return objectMapper;
}
}
Employee.java
package com.course.kafka.entity;
import java.time.LocalDate;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
@JsonNaming(value = PropertyNamingStrategies.SnakeCaseStrategy.class)
public class Employee {
private String employeeId;
private String name;
private LocalDate birthDate;
}
EmployeeJsonProducer.java
package com.course.kafka.producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import com.course.kafka.entity.Employee;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@Service
public class EmployeeJsonProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private ObjectMapper objectMapper;
public void sendMessage(Employee emp) throws JsonProcessingException {
var json = objectMapper.writeValueAsString(emp);
kafkaTemplate.send("t-employee", json);
}
}
MainApp.java
package com.course.kafka;
import java.time.LocalDate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import com.course.kafka.entity.Employee;
import com.course.kafka.producer.EmployeeJsonProducer;
@SpringBootApplication
//@EnableScheduling
public class KafkaProducerApplication implements CommandLineRunner {
@Autowired
private EmployeeJsonProducer producer;
public static void main(String[] args) {
SpringApplication.run(KafkaProducerApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
for (int i = 0; i < 5; i++) {
var employee = Employee.builder().employeeId("emp-" + i).name("Employee " + i).birthDate(LocalDate.now()).build();
producer.sendMessage(employee);
}
}
}
Consumer Code —
JsonConfig.java, Employee.java will remain same.
application.yml
server:
port: 8081
spring:
kafka:
consumer:
group-id: default-spring-consumer
auto-offset-reset: earliest
jackson:
date-format: yyyy-MMM-dd
EmployeeJsonConsumer.java
package com.course.kafka.consumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import com.course.kafka.entity.Employee;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
@Service
@Slf4j
public class EmployeeJsonConsumer {
@Autowired
private ObjectMapper objectMapper;
@KafkaListener(topics = "t-employee")
public void listen(String message) throws JsonMappingException, JsonProcessingException {
Employee employee = objectMapper.readValue(message, Employee.class);
log.info("Employee is :{}", employee);
}
}
kafka-console-consumer --bootstrap-server localhost:9092 --offset earliest --partition 0 --topic t-employee
{"employee_id":"emp-0","name":"Employee 0","birth_date":"2022-05-08"}
{"employee_id":"emp-1","name":"Employee 1","birth_date":"2022-05-08"}
{"employee_id":"emp-2","name":"Employee 2","birth_date":"2022-05-08"}
{"employee_id":"emp-3","name":"Employee 3","birth_date":"2022-05-08"}
{"employee_id":"emp-4","name":"Employee 4","birth_date":"2022-05-08"}