Kafka Implementation With Spring Boot

Posted By : Avnish Pandey | 28-Mar-2018

Kafka :
Kafka is a distributed massaging system. Kafka is a very fast and reliable massaging system. It is the best replacement for more traditional message broker. If we compare the Kafka with another massaging service then we found it has better throughput, replication, and fault-tolerance, which makes Kafka very reliable.

Kafka integration with Spring Boot :
Spring has already given a library for implementing Kafka with Spring Boot. By using this library we can create the producer for producing data and consumer for consuming the data. We just need to add the dependency for spring. It automatically downloads the Kafka library, then we can use the spring library for Kafka.

Here are the dependencies which we have to add in pom.XML :

<dependency>
	<groupId> org.springframework.kafka </groupId>
	<artifactId> spring-kafka </artifactId>
</dependency>

Now we have to configure the Kafka :

package com.kakfa;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;

@EnableKafka
@Configuration
public class KafkaConfiguration {
   private static final String broker = "localhost:9092";
   private static final String groupId = "kafka-group";
   @Bean
   public KafkaTemplate<String, String> getKafkaTemplate() {
	Map<String, Object> props = new HashMap<>();
	props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
	props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
	props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
	return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
   }

   @Bean
   public ConsumerFactory<String, String> consumerFactory() {
	Map<String, Object> props = new HashMap<>();
	props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
	props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
	props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
	props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
	return new DefaultKafkaConsumerFactory<>(props);
   }

   @Bean
   public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
	ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
	factory.setConsumerFactory(consumerFactory());
	return factory;
   }
}

Producing the data by using KafkaTemplate class object :

package com.kakfa;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;

public class KafkaProducer {
	
   @Autowired
   private KafkaTemplate<String, String> kafkaTemplate;
	
   public void produce(String data) {
	kafkaTemplate.send("topic", data);
   }
}

Consuming the data by using KafkaListner annotation :

package com.kakfa;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumer {
	
   public static Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);
	
   @KafkaListener(topics = "topic")
   public void consume(String content) {
	LOGGER.info("Consumed data :: "+content);
   }
}

Thanks,

About Author

Author Image
Avnish Pandey

Avnish has a good knowledge in core & advance Java, Spring and Hibernate Framework. He loves to learn new technologies.

Request for Proposal

Name is required

Comment is required

Sending message..