I wanted to figure out how do i pass JSON message using Kafka. It looks like Kafak Connect provides a simple JSON Serializer
org.apache.kafka.connect.json.JsonDeserializerthat uses Jackson JSON parser. I wanted to figure out how to use it, so i built following sample
- First i did create a Contact Object, which is a simple Pojo that has 3 fields
contactId, firstName and lastName. Take a look at main() method, in which i create simple object of Contact and then convert it to JSON and write to console.
Next i created Producer.java, which reads values in CSV format like
1,Sunil,Patilfrom command line and parse it to Contact object first. Then i convert Contact object into
JSONNodeand pass it as value to Kafka, The
The producer code is mostly same as one required for passing String, with difference that on line 35, i am creating object of
com.fasterxml.jackson.databind.ObjectMapperand then on line 41 i am converting Contact object into JSONNode by calling
Since i am using
org.apache.kafka.connect.json.JsonSerializeron the producer i have to use
org.apache.kafka.connect.json.JsonDeserializeron the Consumer, Then while creating
KafkaConsumerobject i declare that i will get String key and JSONNode as value. Then once i get messages from Kafka i am calling
mapper.treeToValue(jsonNode,Contact.class)to read the message and convert it back to Contact object.