Sending and Receiving JSON messages in Kafka

Sometime back i wrote couple of articles for Java World about Kafka Big data messaging with Kafka, Part 1 and Big data messaging with Kafka, Part 2, you can find basic Producer and Consumer for Kafka along with some basic samples.
I wanted to figure out how do i pass JSON message using Kafka. It looks like Kafak Connect provides a simple JSON Serializer org.apache.kafka.connect.json.JsonSerializer and Desrializer org.apache.kafka.connect.json.JsonDeserializer that uses Jackson JSON parser. I wanted to figure out how to use it, so i built following sample
  • First i did create a Contact Object, which is a simple Pojo that has 3 fields contactId, firstName and lastName. Take a look at main() method, in which i create simple object of Contact and then convert it to JSON and write to console.
  • Next i created Producer.java, which reads values in CSV format like 1,Sunil,Patil from command line and parse it to Contact object first. Then i convert Contact object into JSONNode and pass it as value to Kafka, The JSONSerializer converts the JsonNode into byte[]
    The producer code is mostly same as one required for passing String, with difference that on line 35, i am creating object of com.fasterxml.jackson.databind.ObjectMapper and then on line 41 i am converting Contact object into JSONNode by calling objectMapper.valueToTree(contact)
  • Since i am using org.apache.kafka.connect.json.JsonSerializer on the producer i have to use org.apache.kafka.connect.json.JsonDeserializer on the Consumer, Then while creating KafkaConsumer object i declare that i will get String key and JSONNode as value. Then once i get messages from Kafka i am calling mapper.treeToValue(jsonNode,Contact.class) to read the message and convert it back to Contact object.
Now you can run the producer and consumer with same topic name and it should work

17 comments:

farhoud mojahedzadeh said...

Thanks man very useful example

venkatreddy katla said...

While consuming the messages in java, the json object is not getting to java console

venkatreddy katla said...

ConsumerRecords records = kafkaConsumer.poll(100);

here records size coming as Zero

venkatreddy katla said...

Thanks It is working....

Anonymous said...

Thanks man really helpful !!!

Anonymous said...

Team, can you please help on this issue?

After closing KafkaConsumer
Exception in thread "main" org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Hello': was expecting ('true', 'false' or 'null')
at [Source: [B@4313f5bc; line: 1, column: 7]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Hello': was expecting ('true', 'false' or 'null')
at [Source: [B@4313f5bc; line: 1, column: 7]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1586)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:521)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3466)

Anonymous said...

This was a live saver. Thank you!

Mounika Karoshi said...

Good
python training in bangalore
python online training

KATE TECHNOLOGIES said...

hi your blog Article is very nice & thanks for sharing the information.
seo services in warangal
web design services in warangal
android services in warangal

Amar G said...

Thank you for sharing this type of interview questions
Iot Online Training
Itil Interview Questions
Salesforce Interview Questions
Msbi Interview questions
Salesforce Interview Questions
C Interview Questions

Anonymous said...

Nice work!!! but this doesn't support for the nested classes within the JSON message. please give an example having nested classes in the JSON message.

regina razz said...



with your entire process to setup office product online. Have you Just bought Microsoft Office product ? If yes then you can complete your Office Setup online with your product key code. You just need open office.com/setup , Install Office , Install Microsoft Office or setup.office.com into your web browser.

office setup
www.office.com/setup
office.com/setup

regina razz said...


In case any message saying ‘Stop’ pops up while installing Microsoft Office 365 due to a compatibility issue, contact our live chat support.Our online Experts through live chat will guide you through the entire process of Office setup, covering all steps and issues.Keep your 25 characters long product key with you.



Office Com Setup
office com/setup
www.office.com/setup

sowmiya gopal said...


Thanks for helping me to understand concepts. As a beginner in Hadoop your post help me a lot.
Hadoop Training in Velachery | Hadoop Training

gkr ragini said...

Existing without the answers to the difficulties you’ve sorted out through this guide is a critical case, as well as the kind which could have badly affected my entire career if I had not discovered your website. Best AWS Training in Bangalore

preethi Sharma said...

Hi There,

Hot! That was HOT! Glued to the "Sending and Receiving JSON messages in Kafka" your proficiency and style!

i'm trying to update a record using "onrowaction " button in lightning datatable , when i'm passing the parameters to server side controller getting this error "Cannot read property 'updaterecord' of undefined".

Please help me to find solutions for below querrys
1. How to capture row id and pass to server side controller .
2. How to get the component value ( getting error 'Cannot read property 'updaterecord' of undefined')
2. Is it possible to call helper method using "onrowaction " . Below my sample code

Thank you very much and will look for more postings from you.

Many Thanks,
Preethi.

preethi Sharma said...

Hi There,

Brilliant article, glad I slogged through the "Sending and Receiving JSON messages in Kafka" it seems that a whole lot of the details really come back to from my past project.

I have a custom field on my Account object, say 'refered_by__c'. This field contains the ID of another Account object.

I want to write a SOQL something like this (psedu query):

select id, name, refered_by__c, from account

Please advise how can I write this query using SOQL.

By the way do you have any YouTube videos, would love to watch it. I would like to connect you on LinkedIn, great to have experts like you in my connection (In case, if you don’t have any issues).


Kind Regards,
Preethi.