Sending and Receiving JSON messages in Kafka

Sometime back i wrote couple of articles for Java World about Kafka Big data messaging with Kafka, Part 1 and Big data messaging with Kafka, Part 2, you can find basic Producer and Consumer for Kafka along with some basic samples.
I wanted to figure out how do i pass JSON message using Kafka. It looks like Kafak Connect provides a simple JSON Serializer org.apache.kafka.connect.json.JsonSerializer and Desrializer org.apache.kafka.connect.json.JsonDeserializer that uses Jackson JSON parser. I wanted to figure out how to use it, so i built following sample
  • First i did create a Contact Object, which is a simple Pojo that has 3 fields contactId, firstName and lastName. Take a look at main() method, in which i create simple object of Contact and then convert it to JSON and write to console.
  • Next i created Producer.java, which reads values in CSV format like 1,Sunil,Patil from command line and parse it to Contact object first. Then i convert Contact object into JSONNode and pass it as value to Kafka, The JSONSerializer converts the JsonNode into byte[]
    The producer code is mostly same as one required for passing String, with difference that on line 35, i am creating object of com.fasterxml.jackson.databind.ObjectMapper and then on line 41 i am converting Contact object into JSONNode by calling objectMapper.valueToTree(contact)
  • Since i am using org.apache.kafka.connect.json.JsonSerializer on the producer i have to use org.apache.kafka.connect.json.JsonDeserializer on the Consumer, Then while creating KafkaConsumer object i declare that i will get String key and JSONNode as value. Then once i get messages from Kafka i am calling mapper.treeToValue(jsonNode,Contact.class) to read the message and convert it back to Contact object.
Now you can run the producer and consumer with same topic name and it should work

7 comments:

farhoud mojahedzadeh said...

Thanks man very useful example

venkatreddy katla said...

While consuming the messages in java, the json object is not getting to java console

venkatreddy katla said...

ConsumerRecords records = kafkaConsumer.poll(100);

here records size coming as Zero

venkatreddy katla said...

Thanks It is working....

Anonymous said...

Thanks man really helpful !!!

Anonymous said...

Team, can you please help on this issue?

After closing KafkaConsumer
Exception in thread "main" org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Hello': was expecting ('true', 'false' or 'null')
at [Source: [B@4313f5bc; line: 1, column: 7]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Hello': was expecting ('true', 'false' or 'null')
at [Source: [B@4313f5bc; line: 1, column: 7]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1586)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:521)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3466)

Anonymous said...

This was a live saver. Thank you!