Kakfa — Producer and Consumer

Prateek
2 min readMay 8, 2022

--

In this tutorial, we’ll use show you how to produce & consume message from kafka topics. I am using docker to setup the kakfa and zookeeper!

docker-compose -f docker-compose-core.yml -p core up -d
WARNING: The Docker Engine you're using is running in swarm mode.
Compose does not use swarm mode to deploy services to multiple nodes in a swarm. All containers will be scheduled on the current node.
To deploy your application across the swarm, use `docker stack deploy`.
Creating network "kafka-net" with driver "bridge"
Pulling zookeeper (docker.io/bitnami/zookeeper:3.8)…
c9aaae6acc00: Pull complete
a83a4db86b15: Pull complete
1c37183b826c: Pull complete
ae9b375cbdbb: Pull complete
633f17d03616: Pull complete
0d6d03cdbf5a: Pull complete
2540d64131c0: Pull complete
ce125f085ed5: Pull complete
ec36d0eb5197: Pull complete
9f163bb61c08: Pull complete
0995340252b2: Pull complete
dfacc4188b8a: Pull complete
8d285f959664: Pull complete
Digest: sha256:57b1cb759efcf2be1ba5511dfa23813e2fb8281d2fb2b78fa6063127b722cd50
Status: Downloaded newer image for bitnami/zookeeper:3.8
Pulling kafka (docker.io/bitnami/kafka:3)…
3: Pulling from bitnami/kafka
a2b89359fa38: Pull complete
3161c9bbaf5b: Pull complete
4721591cc441: Pull complete
79c6a37e590d: Pull complete
9dd9505cdfaa: Pull complete
7f1fc09b5e51: Pull complete
7e10b2f60ae7: Pull complete
232ce759c205: Pull complete
38c139fd8f95: Pull complete
413e8b01df4a: Pull complete
a6d3894576a1: Pull complete
32597ec12555: Pull complete
0ddc812d25a7: Pull complete
b65c477fda80: Pull complete
Digest: sha256:b5a0bd7d3f50d93726bb41ebbbfee6852d50611f0f448e8420b0f63d9a79dbf4
Status: Downloaded newer image for bitnami/kafka:3
Creating zookeeper … done
Creating kafka … done

Steps to create topic

% kafka-topics --bootstrap-server localhost:9092 --create --topic t-hello --partitions 1 --replication-factor 1
Created topic t-hello.
% kafka-topics --bootstrap-server localhost:9092 --list
t-hello
% kafka-topics --bootstrap-server localhost:9092 --describe --topic t-hello
Topic: t-hello TopicId: Iq72hKS3R2SJd7_T2eIfQA PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: t-hello Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Offline:

Producer:

package com.course.kafka.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class HelloKafkaProducer {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void sendHello(String name) {
kafkaTemplate.send("t-hello", "Hello-"+name);
}
}

MainApp.java

package com.course.kafka;

import java.util.concurrent.ThreadLocalRandom;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import com.course.kafka.producer.HelloKafkaProducer;

@SpringBootApplication
public class KafkaProducerApplication implements CommandLineRunner{

@Autowired
private HelloKafkaProducer helloKafkaProducer;

public static void main(String[] args) {
SpringApplication.run(KafkaProducerApplication.class, args);
}

@Override
public void run(String... args) throws Exception {
helloKafkaProducer.sendHello("Prateek "+ ThreadLocalRandom.current().nextInt());
}

}

Consumer: HelloKafkaConsumer.java

package com.course.kafka.consumer;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class HelloKafkaConsumer {

@KafkaListener(topics = "t-hello")
public void consume(String message) {
System.out.println(message);
}
}

MainApp.java

package com.course.kafka;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaConsumerApplication {

public static void main(String[] args) {
SpringApplication.run(KafkaConsumerApplication.class, args);
}

}

application.yml

logging:
pattern:
console: "[Kafka Core Consumer] %clr(%d{HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:%5p}) %clr(---){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:%wEx}"

spring
:
kafka:
consumer:
group-id: default-spring-consumer

Run the consumer first and then start the producer:

--

--