Saving Object In Kafka Using Object Mapper
Posted By : Krishna Verma | 31-May-2018
Hello everyone this is krishna verma , in this blog i will show you how you can store Objects in kafka using Object Mapper and how you can get it back from kafka using kafka consumer in the object form. sending String data in kafka it's very simple but the problem arises when you have to deal with the object and you have to store the object in the kafka.
ObjectMapper:
using objectMapper object you can convert an object into a String and then again you can convert that string to object, so what we basically do here before sending Object to kafka we first convert it into a string using object Mapper and when we consuming it from consumer then we convert it back to the object form.
Kafka-producer-Controller code:
@RestController @RequestMapping("kafka") public class UserResource { @Autowired private KafkaTemplatekafkaTemplate; private static final String TOPIC = "Kafka_Example"; @Autowired KafkaService kafkaService; @PostMapping("/publish") public String post(@RequestBody final Object obj ) { kafkaTemplate.send(TOPIC,kafkaService.serializedData(obj)); System.out.println("inside post method of kafka producer"); //kafkaTemplate.send(TOPIC,obj); return "Published successfully"; } }
In this controller you will pass an object from the requestBody , kafkaTemplate.send() function will add it into kafka , kafkaService.serializedData(Obj) function converts the object into the string form so basically our data will be store in the kafka in the form of string.
kafka-producer-service code:
@Service
public class KafkaService {
ObjectMapper mapper = new ObjectMapper();
public String serializedData(Object obj) {
String objAsString = "";
try {
objAsString = mapper.writeValueAsString(obj);
} catch (JsonProcessingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return objAsString;
}
}
this is the function used for converting a object to the string
code for kafka-consumer-controller
@Service public class KafkaConsumer { @Autowired private OrderSaveByFeignClient orderSaveByFeignClient; public Object deSerializedData(String str) { ObjectMapper mapper = new ObjectMapper(); Object obj = null; try { obj = mapper.readValue(str, Object.class); } catch (JsonParseException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (JsonMappingException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } return obj; } @KafkaListener(topics = "Kafka_Example", groupId = "group_id") public void consume(String message) { Object obj = deSerializedData(message); ObjectMapper oMapper = new ObjectMapper(); Mapmap = oMapper.convertValue(obj, Map.class); orderSaveByFeignClient.buyOrderSave(map); } }
this is the service of my kafka-consumer it will fetch the data from the kafka in the form of string and after getting that data we will convert it back in the form of object.
Cookies are important to the proper functioning of a site. To improve your experience, we use cookies to remember log-in details and provide secure log-in, collect statistics to optimize site functionality, and deliver content tailored to your interests. Click Agree and Proceed to accept cookies and go directly to the site or click on View Cookie Settings to see detailed descriptions of the types of cookies and choose whether to accept certain cookies while on the site.
About Author
Krishna Verma
Krishna is a Software Developer having key skills in java , his hobbies are learning new technologies