Saving Object In Kafka Using Object Mapper

Posted By : Krishna Verma | 31-May-2018

Hello everyone this is krishna verma , in this blog i will show you how you can store Objects in kafka using Object Mapper and how you can get it back from kafka using kafka consumer in the object form. sending String data in kafka it's very simple but the problem arises when you have to deal with the object and you have to store the object in the kafka.

ObjectMapper:
using objectMapper object you can convert an object into a String and then again you can convert that string to object, so what we basically do here before sending Object to kafka we first convert it into a string using object Mapper and when we consuming it from consumer then we convert it back to the object form.

Kafka-producer-Controller code:

@RestController
@RequestMapping("kafka")
public class UserResource {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    private static final String TOPIC = "Kafka_Example";
    
    @Autowired
    KafkaService kafkaService;
    
    @PostMapping("/publish")
    public String post(@RequestBody final Object obj ) {    
        kafkaTemplate.send(TOPIC,kafkaService.serializedData(obj));
    	System.out.println("inside post method of kafka producer");
    	//kafkaTemplate.send(TOPIC,obj);
    	return "Published successfully";
    }
}
 

In this controller you will pass an object from the requestBody , kafkaTemplate.send() function will add it into kafka , kafkaService.serializedData(Obj) function converts the object into the string form so basically our data will be store in the kafka in the form of string.

kafka-producer-service code:

@Service
public class KafkaService {
	
	ObjectMapper mapper = new ObjectMapper();
	public String serializedData(Object obj) {
		String objAsString = "";
	try {
		objAsString =	mapper.writeValueAsString(obj);
	} catch (JsonProcessingException e) {
		// TODO Auto-generated catch block
		e.printStackTrace();
	}
	return objAsString;
	}
}
 

this is the function used for converting a object to the string

code for kafka-consumer-controller

 
@Service
public class KafkaConsumer {

	@Autowired
	private OrderSaveByFeignClient orderSaveByFeignClient;
	
	public Object deSerializedData(String str) {
		ObjectMapper mapper = new ObjectMapper();
		Object obj = null;
		try {
			obj = mapper.readValue(str, Object.class);
		} catch (JsonParseException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (JsonMappingException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		return obj;
	}

	@KafkaListener(topics = "Kafka_Example", groupId = "group_id")
	public void consume(String message) {
		Object obj = deSerializedData(message);
		ObjectMapper oMapper = new ObjectMapper();
		Map map = oMapper.convertValue(obj, Map.class);
		orderSaveByFeignClient.buyOrderSave(map);
	}

}


this is the service of my kafka-consumer it will fetch the data from the kafka in the form of string and after getting that data we will convert it back in the form of object.

About Author

Author Image
Krishna Verma

Krishna is a Software Developer having key skills in java , his hobbies are learning new technologies

Request for Proposal

Name is required

Comment is required

Sending message..