Kafka Implementation With Spring Boot

Posted By : Avnish Pandey | 28-Mar-2018

Kafka :
Kafka is a distributed massaging system. Kafka is a very fast and reliable massaging system. It is the best replacement for more traditional message broker. If we compare the Kafka with another massaging service then we found it has better throughput, replication, and fault-tolerance, which makes Kafka very reliable.

Kafka integration with Spring Boot :
Spring has already given a library for implementing Kafka with Spring Boot. By using this library we can create the producer for producing data and consumer for consuming the data. We just need to add the dependency for spring. It automatically downloads the Kafka library, then we can use the spring library for Kafka.

Here are the dependencies which we have to add in pom.XML :

	<groupId> org.springframework.kafka </groupId>
	<artifactId> spring-kafka </artifactId>

Now we have to configure the Kafka :

package com.kakfa;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;

public class KafkaConfiguration {
   private static final String broker = "localhost:9092";
   private static final String groupId = "kafka-group";
   public KafkaTemplate<String, String> getKafkaTemplate() {
	Map<String, Object> props = new HashMap<>();
	props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
	props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
	props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
	return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));

   public ConsumerFactory<String, String> consumerFactory() {
	Map<String, Object> props = new HashMap<>();
	props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
	props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
	props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
	props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
	return new DefaultKafkaConsumerFactory<>(props);

   public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
	ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
	return factory;

Producing the data by using KafkaTemplate class object :

package com.kakfa;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;

public class KafkaProducer {
   private KafkaTemplate<String, String> kafkaTemplate;
   public void produce(String data) {
	kafkaTemplate.send("topic", data);

Consuming the data by using KafkaListner annotation :

package com.kakfa;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

public class KafkaConsumer {
   public static Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
   @KafkaListener(topics = "topic")
   public void consume(String content) {
	LOGGER.info("Consumed data :: "+content);


About Author

Author Image
Avnish Pandey

Avnish has a good knowledge in core & advance Java, Spring and Hibernate Framework. He loves to learn new technologies.

Request for Proposal

Name is required

Comment is required

Sending message..