How to drain/delete/expire existing messages in Kafka

Some time you might have a bad record in Kafka topic that you want to delete. Kafka does not provide direct option to delete specific record. Only way to delete records is to expire them. You can achieve this by setting data retention to say 1 second that expires all the old messages. You can follow these steps
    First check the topic to find out value of retention.ms config parameter for the topic
  1. 
    kafka_2.11-0.11.0.1 spatil$ bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name my-topic --entity-type topics
    Configs for topic 'my-topic' are retention.ms=86400000
    
  2. Change value of retention.ms to 1 which means all messages older than 1 ms will be expired
    
    kafka_2.11-0.11.0.1 spatil$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-name my-topic --entity-type topics --add-config retention.ms=1
    Completed Updating config for entity: topic 'my-topic'
    
  3. Wait for few seconds and monitor logs of the Kafka server to make sure that messages from the topic are deleted
  4. Now change the value of retention.ms back to its original value which was 86400000 (7 days)
    
    kafka_2.11-0.11.0.1 spatil$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-name my-topic --entity-type topics --add-config retention.ms=86400000
    Completed Updating config for entity: topic 'my-topic'.
    
  5. You can verify that your changes are saved by running this command
    
    kafka_2.11-0.11.0.1 spatil$ bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name my-topic --entity-type topics
    Configs for topic 'my-topic' are retention.ms=86400000
    

Kafka how to reset number of partitions in a topics

I wanted to figure out how to reset number of partitions in my topic in Kafka and I followed these steps
  1. I did create a sample topic called my-topic with single partition
    
    spatil$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic my-topic --replication-factor 1 --partitions 1
    Created topic "my-topic".
    
  2. I used describe command to verify that my topic has actually single partition
    
    spatil$ bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic my-topic
    Topic:my-topic PartitionCount:1 ReplicationFactor:1 Configs:
     Topic: my-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
    
  3. Then I did execute alter command on my partition and changed number of partitions from 1 to 3
    
    spatil$ bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --partitions 3
    WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
    Adding partitions succeeded!
    
  4. I did execute describe command on my topic to verify that it actually has 3 topics
    
    spatil$ bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic my-topic
    Topic:my-topic PartitionCount:3 ReplicationFactor:1 Configs:
     Topic: my-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
     Topic: my-topic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
     Topic: my-topic Partition: 2 Leader: 0 Replicas: 0 Isr: 0
    

How to reset consumer group offset

First run describe on topic to check what it the current LAG its zero in this case

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group user.kafkaconsumer
Now run following command to just preview what will be the next offset if you reset

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group user.kafkaconsumer --reset-offsets --to-earliest --all-topics
Shutdown the consumer so that you can restart the consumer Now go back/reset the offset so that it goes back to first message 3bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group user.kafkaconsumer --reset-offsets --to-earliest --all-topics --execute Go back and verify that the consumer offset actually went back by executing following command

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group user.kafkaconsumer
You should be able to see the offset set back to 22000 which is start of first message in Kafka.

Spark program to read data from RDBMS

I wanted to figure out how to connect to RDBMS from spark and extract data, so i followed these steps. You can download this project form github
First i did create Address table in my local mysql like this

CREATE TABLE `address` (
  `addressid` int(11) NOT NULL AUTO_INCREMENT,
  `contactid` int(11) DEFAULT NULL,
  `line1` varchar(300) NOT NULL,
  `city` varchar(50) NOT NULL,
  `state` varchar(50) NOT NULL,
  `zip` varchar(50) NOT NULL,
  `lastmodified` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`addressid`),
  KEY `contactid` (`contactid`),
  CONSTRAINT `address_ibfk_1` FOREIGN KEY (`contactid`) REFERENCES `CONTACT` (`contactid`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8;
Then i did add 5 sample records to the address table. When i query address table on my local this is what i get
After that i did create a Spark Scala project that has mysql-connector-java as one of the dependencies
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.spnotes.spark</groupId>
<artifactId>JDBCSpark</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.target>1.7</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.tools.version>2.10</scala.tools.version>
<scala.version>2.10.4</scala.version>
<spark.version>1.5.2</spark.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.tools.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.tools.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
view raw pom.xml hosted with ❤ by GitHub
The last step was to create a simple Spark program like this,
package com.spnotes.spark
import java.sql.{Connection, DriverManager, ResultSet}
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by sunilpatil on 4/19/16.
*/
object JDBCRDDClient {
case class Address(addressId: Int, contactId: Int, line1: String, city: String, state: String, zip: String)
def main(argv: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("HelloJDBC")
val sparkContext = new SparkContext(sparkConf)
val jdbcRdd = new JdbcRDD(sparkContext, getConnection,
"select * from address limit ?,?",
0, 5, 1, convertToAddress)
jdbcRdd.foreach(println)
}
def getConnection(): Connection = {
Class.forName("com.mysql.jdbc.Driver")
DriverManager.getConnection("jdbc:mysql://localhost/test1?" + "user=test1&password=test1")
}
def convertToAddress(rs: ResultSet): Address = {
new Address(rs.getInt("addressid"), rs.getInt("contactid"), rs.getString("line1"),
rs.getString("city"), rs.getString("state"), rs.getString("zip"))
}
}
My program has 4 main sections
  1. First is Address as case class with same schema as that of Address table, without lastmodified field
  2. Next is this call to create object of JdbcRDD that says query everything from address with addressid between 1 and 5. new JdbcRDD(sparkContext, getConnection, "select * from address limit ?,?", 0, 5, 1, convertToAddress)
  3. Then i did define getConnection() method that creates JDBC connection to my database and returns it
  4. Last is the convertToAddress() method that knows how to take a ResultSet and convert it into object of Address
When i run this program in IDE this is the output i get

How to implement cache (LRU Cache) using LinkedHashMap in java

Recently i wanted to implement a simple Least recently used (LRU) cache in one my applications. But my use case is simple enough that instead of going for something ehcache i decided to build it on own by using java.util.LinkedHashMap
As you can see from the code its very simple. All you have to do is extend java.util.LinkedHashMap and override its protected removeEldestEntry() method so that it checks if the size of map is greater than a size you specified while creating the Map if yes remove the eldest entry
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
public class LRUCache<Key,Value> extends LinkedHashMap<Key,Value>{
private int cacheSize;
public LRUCache(int cacheSize) {
super(cacheSize, 0.75f,true);
this.cacheSize = cacheSize;
}
@Override
protected boolean removeEldestEntry(Map.Entry<Key, Value> eldest) {
return size() > cacheSize;
}
public static void main(String[] argv){
LRUCache<String, String> cache = new LRUCache<String,String>(3);
Map<String,String> map = Collections.synchronizedMap(cache);
cache.put("First","1");
cache.put("Second","2");
cache.put("Third","3");
System.out.println(cache);
cache.get("First");
cache.get("First");
cache.put("Fourth","4");
System.out.println(cache);
cache.put("Fifth","5");
System.out.println(cache);
}
}
view raw LRUCache.java hosted with ❤ by GitHub
Now the question is when Map is full which entry will it remove, you have 2 options
  1. Eldest: If you just want to remove the first entry that you inserted in the Map when adding a new entry then in your constructor you could use super(cacheSize, 0.75f);, so LinkedHashMap wont keep track of when a particular entry were accessed.
  2. Least recently used (LRU): But if you want to make sure that the entry that was least recently used should be removed then call super(cacheSize, 0.75f, true); from constructor of your LRUCache so that LinkedHashMap keeps track of when entry was accessed and removes the Least recently used entry

Spark Streaming Kafka 10 API Word Count application Scala

In Spark Kafka Streaming Java program Word Count using Kafka 0.10 API blog entry i talked about how you create a simple java program that uses Spark Streaming's Kafka10 API using Java. This blog entry does the same thing but using Scala. You can download the complete application from github
You can run this sample by first downloading Kafka 0.10.* from Apache Kafka WebSite, then you can create and start a test topic and send messages to it by following this Kafka Quick start document
package com.spnotes.spark
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Durations, StreamingContext}
import scala.collection.mutable
/**
* Created by sunilpatil on 1/11/17.
*/
object Kafka10 {
def main(argv: Array[String]): Unit = {
// Configure Spark to connect to Kafka running on local machine
val kafkaParam = new mutable.HashMap[String, String]()
kafkaParam.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
kafkaParam.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer")
kafkaParam.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer")
kafkaParam.put(ConsumerConfig.GROUP_ID_CONFIG, "group1")
kafkaParam.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
kafkaParam.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
val conf = new SparkConf().setMaster("local[2]").setAppName("Kafka10")
//Read messages in batch of 30 seconds
val sparkStreamingContext = new StreamingContext(conf, Durations.seconds(30))
//Configure Spark to listen messages in topic test
val topicList = List("test")
// Read value of each message from Kafka and return it
val messageStream = KafkaUtils.createDirectStream(sparkStreamingContext,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicList, kafkaParam))
val lines = messageStream.map(consumerRecord => consumerRecord.value().asInstanceOf[String])
// Break every message into words and return list of words
val words = lines.flatMap(_.split(" "))
// Take every word and return Tuple with (word,1)
val wordMap = words.map(word => (word, 1))
// Count occurance of each word
val wordCount = wordMap.reduceByKey((first, second) => first + second)
//Print the word count
wordCount.print()
sparkStreamingContext.start()
sparkStreamingContext.awaitTermination()
}
}
view raw Kafka10.scala hosted with ❤ by GitHub

Spark Kafka Streaming Java program Word Count using Kafka 0.10 API

Kafka API went through a lot of changes starting Kafka 0.9. Spark Kafka Streaming API also was changed to better support Kafka 0.9. i wanted to try that out so i built this simple Word Count application using Kafka 0.10 API. This blog entry does the same thing but using Scala. You can download the complete application from github
You can run this sample by first downloading Kafka 0.10.* from Apache Kafka WebSite, then you can create and start a test topic and send messages to it by following this Kafka Quick start document First thing i did was to include Kafka 0.10 API dependencies for the Spark Project. As you can see i am using Spark 2.1 version
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.1.0</version>
</dependency>
view raw pom.xml hosted with ❤ by GitHub
Then i did create a SparkKafka10.java file that looks like this. Please take a look at comments inside the code for what i am doing. Now if you create test topic and send messages to it, you should be able to see the wordcount on console
package com.test;
import com.test.schema.ContactType;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import scala.Tuple2;
import java.util.*;
/**
* Created by sunilpatil on 1/11/17.
*/
public class SparkKafka10 {
public static void main(String[] argv) throws Exception{
// Configure Spark to connect to Kafka running on local machine
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG,"group1");
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
//Configure Spark to listen messages in topic test
Collection<String> topics = Arrays.asList("test");
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkKafka10WordCount");
//Read messages in batch of 30 seconds
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(30));
// Start reading messages from Kafka and get DStream
final JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String,String>Subscribe(topics,kafkaParams));
// Read value of each message from Kafka and return it
JavaDStream<String> lines = stream.map(new Function<ConsumerRecord<String,String>, String>() {
@Override
public String call(ConsumerRecord<String, String> kafkaRecord) throws Exception {
return kafkaRecord.value();
}
});
// Break every message into words and return list of words
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String line) throws Exception {
return Arrays.asList(line.split(" ")).iterator();
}
});
// Take every word and return Tuple with (word,1)
JavaPairDStream<String,Integer> wordMap = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<>(word,1);
}
});
// Count occurance of each word
JavaPairDStream<String,Integer> wordCount = wordMap.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer first, Integer second) throws Exception {
return first+second;
}
});
//Print the word count
wordCount.print();
jssc.start();
jssc.awaitTermination();
}
}

How to use ElasticSearch as storage from Hive in cloudera

.In the Using ElasticSearch as external data store with apache hive entry i talked about how you can create a table in Hive so that actual data is stored in ElasticSearch. Problem with that approach was that i had to pass the full path to elasticsearch-hadoop-hive-<eshadoopversion>.jar as parameter every time.

hive -hiveconf hive.aux.jars.path=/opt/elastic/elasticsearch-hadoop-2.4.3/dist/elasticsearch-hadoop-hive-2.4.3.jar;
Other option for doing same thing is to open hive session and then calling following command as first thing

ADD JAR /opt/elastic/elasticsearch-hadoop-2.4.3/dist/elasticsearch-hadoop-hive-2.4.3.jar;
Problem with both these approaches is that you will have to keep letting hive know the full path to elasticsearch jars every single time. Instead you can take care of this issue by copying elasticsearch-hadoop-hive-<eshadoopversion>.jar into same directory on every node in your local machine. In my case i copied it to /usr/lib/hive/lib directory by executing following command

sudo cp /opt/elastic/elasticsearch-hadoop-2.4.3/dist/elasticsearch-hadoop-hive-2.4.3.jar /usr/lib/hive/lib/.
Then set the value of Hive Auxiliary JARs Directory hive.aux.jars.path property to /usr/lib/hive/lib directory like this.
Then restart the hive service and now you should be able to access any elastic search backed table without adding the elasticsearch hadoop jar explicitly

Installing ElasticSearch on existing docker container

I was using a Cloudera Quickstart docker image for one experiment and wanted to install ElasticSearch on it but i had trouble in accessing from my host, but i found workaround by following these steps
  • First i installed ElasticSearch by downloading and unzipping ElasticSearch version 2.4.3 and unzipping it in /opt/elastic folder
  • Then i started elasticsearch by executing /bin/elasticsearch, and it started ok. When i ran curl -XGET "http://localhost:9200/ from inside docker images i was able to access ElasticSearch, but when i tried to access it from my host machine, i could not access it. But where as i was able to access other services running on my docker image. So first i tried running netstat on my docker image to see why i was able to access other services but not elasticsearch (Also to make sure if elasticsearch was actually listening on port 9200 and i got output like this
  • Looking at the port mapping i could see that other services were mapped to 0.0.0.0 but elasticsearch was only mapped to 127.0.0.1, so i opene <ELASTICSEARCH_HOME>/config/elasticsearch.yml and added two more lines to it like this
    
    http.host: 0.0.0.0
    transport.host: 127.0.0.1
    
  • Then i restarted elasticsearch server and i checked the netstat again and i could see this mapping, and when i tried accessing elasticsearch from host machine it worked

Sending and Receiving JSON messages in Kafka

Sometime back i wrote couple of articles for Java World about Kafka Big data messaging with Kafka, Part 1 and Big data messaging with Kafka, Part 2, you can find basic Producer and Consumer for Kafka along with some basic samples.
I wanted to figure out how do i pass JSON message using Kafka. It looks like Kafak Connect provides a simple JSON Serializer org.apache.kafka.connect.json.JsonSerializer and Desrializer org.apache.kafka.connect.json.JsonDeserializer that uses Jackson JSON parser. I wanted to figure out how to use it, so i built following sample
  • First i did create a Contact Object, which is a simple Pojo that has 3 fields contactId, firstName and lastName. Take a look at main() method, in which i create simple object of Contact and then convert it to JSON and write to console.
    package com.mapr.kafka.serializer.json;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import java.util.StringTokenizer;
    /**
    * Created by sunilpatil on 12/25/16.
    */
    public class Contact {
    private int contactId;
    private String firstName;
    private String lastName;
    public Contact(){
    }
    public Contact(int contactId, String firstName, String lastName) {
    this.contactId = contactId;
    this.firstName = firstName;
    this.lastName = lastName;
    }
    public void parseString(String csvStr){
    StringTokenizer st = new StringTokenizer(csvStr,",");
    contactId = Integer.parseInt(st.nextToken());
    firstName = st.nextToken();
    lastName = st.nextToken();
    }
    public int getContactId() {
    return contactId;
    }
    public void setContactId(int contactId) {
    this.contactId = contactId;
    }
    public String getFirstName() {
    return firstName;
    }
    public void setFirstName(String firstName) {
    this.firstName = firstName;
    }
    public String getLastName() {
    return lastName;
    }
    public void setLastName(String lastName) {
    this.lastName = lastName;
    }
    @Override
    public String toString() {
    return "Contact{" +
    "contactId=" + contactId +
    ", firstName='" + firstName + '\'' +
    ", lastName='" + lastName + '\'' +
    '}';
    }
    public static void main(String[] argv)throws Exception{
    ObjectMapper mapper = new ObjectMapper();
    Contact contact = new Contact();
    contact.setContactId(1);
    contact.setFirstName("Sachin");
    contact.setLastName("Tendulkar");
    System.out.println(mapper.writeValueAsString(contact));
    contact.parseString("1,Rahul,Dravid");
    System.out.println(mapper.writeValueAsString(contact));
    }
    }
    view raw Contact.java hosted with ❤ by GitHub
  • Next i created Producer.java, which reads values in CSV format like 1,Sunil,Patil from command line and parse it to Contact object first. Then i convert Contact object into JSONNode and pass it as value to Kafka, The JSONSerializer converts the JsonNode into byte[]
    The producer code is mostly same as one required for passing String, with difference that on line 35, i am creating object of com.fasterxml.jackson.databind.ObjectMapper and then on line 41 i am converting Contact object into JSONNode by calling objectMapper.valueToTree(contact)
    package com.mapr.kafka.serializer.json;
    import com.fasterxml.jackson.databind.JsonNode;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.connect.json.JsonSerializer;
    import java.util.Properties;
    import java.util.Scanner;
    /**
    * Created by sunilpatil on 12/25/16.
    */
    public class Producer {
    private static Scanner in;
    public static void main(String[] argv)throws Exception {
    if (argv.length != 1) {
    System.err.println("Please specify 1 parameters ");
    System.exit(-1);
    }
    String topicName = argv[0];
    in = new Scanner(System.in);
    System.out.println("Enter message(type exit to quit)");
    //Configure the Producer
    Properties configProperties = new Properties();
    configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
    configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
    configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.connect.json.JsonSerializer");
    org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties);
    ObjectMapper objectMapper = new ObjectMapper();
    String line = in.nextLine();
    while(!line.equals("exit")) {
    Contact contact = new Contact();
    contact.parseString(line);
    JsonNode jsonNode = objectMapper.valueToTree(contact);
    ProducerRecord<String, JsonNode> rec = new ProducerRecord<String, JsonNode>(topicName,jsonNode);
    producer.send(rec);
    line = in.nextLine();
    }
    in.close();
    producer.close();
    }
    }
    view raw Producer.java hosted with ❤ by GitHub
  • Since i am using org.apache.kafka.connect.json.JsonSerializer on the producer i have to use org.apache.kafka.connect.json.JsonDeserializer on the Consumer, Then while creating KafkaConsumer object i declare that i will get String key and JSONNode as value. Then once i get messages from Kafka i am calling mapper.treeToValue(jsonNode,Contact.class) to read the message and convert it back to Contact object.
    package com.mapr.kafka.serializer.json;
    import com.fasterxml.jackson.core.JsonProcessingException;
    import com.fasterxml.jackson.databind.JsonNode;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.errors.WakeupException;
    import java.util.Arrays;
    import java.util.Properties;
    import java.util.Scanner;
    /**
    * Created by sunilpatil on 12/25/16.
    */
    public class Consumer {
    private static Scanner in;
    public static void main(String[] argv)throws Exception{
    if (argv.length != 2) {
    System.err.printf("Usage: %s <topicName> <groupId>\n",
    com.mapr.kafka.simple.Consumer.class.getSimpleName());
    System.exit(-1);
    }
    in = new Scanner(System.in);
    String topicName = argv[0];
    String groupId = argv[1];
    ConsumerThread consumerRunnable = new ConsumerThread(topicName,groupId);
    consumerRunnable.start();
    String line = "";
    while (!line.equals("exit")) {
    line = in.next();
    }
    consumerRunnable.getKafkaConsumer().wakeup();
    System.out.println("Stopping consumer .....");
    consumerRunnable.join();
    }
    private static class ConsumerThread extends Thread{
    private String topicName;
    private String groupId;
    private KafkaConsumer<String,JsonNode> kafkaConsumer;
    public ConsumerThread(String topicName, String groupId){
    this.topicName = topicName;
    this.groupId = groupId;
    }
    public void run() {
    Properties configProperties = new Properties();
    configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonDeserializer");
    configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    configProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, "simple");
    //Figure out where to start processing messages from
    kafkaConsumer = new KafkaConsumer<String, JsonNode>(configProperties);
    kafkaConsumer.subscribe(Arrays.asList(topicName));
    ObjectMapper mapper = new ObjectMapper();
    //Start processing messages
    try {
    while (true) {
    ConsumerRecords<String, JsonNode> records = kafkaConsumer.poll(100);
    for (ConsumerRecord<String, JsonNode> record : records) {
    JsonNode jsonNode = record.value();
    System.out.println(mapper.treeToValue(jsonNode,Contact.class));
    }
    }
    }catch(WakeupException ex){
    System.out.println("Exception caught " + ex.getMessage());
    } catch (JsonProcessingException e) {
    e.printStackTrace();
    } finally{
    kafkaConsumer.close();
    System.out.println("After closing KafkaConsumer");
    }
    }
    public KafkaConsumer<String,JsonNode> getKafkaConsumer(){
    return this.kafkaConsumer;
    }
    }
    }
    view raw Consumer.java hosted with ❤ by GitHub
Now you can run the producer and consumer with same topic name and it should work