Java Client for publishing and consuming messages from Apache Kafka

I wanted to learn how to use Apache Kafka for publishing and consuming messages from Apache Kafka using Java client, so i followed these steps.
  1. Download the Kafka binaries from Kafka download page
  2. Unzip the kafka tar file by executing tar -xzf kafka_2.9.2-0.8.1.1.tgz. Then go to kafka directory by executing cd kafka_2.9.2-0.8.1.1
  3. Next start the Zookeeper server by executing following command
    
    bin/zookeeper-server-start.sh config/zookeeper.properties
    
  4. Start the Kafka server by executing following command
    
    bin/kafka-server-start.sh config/server.properties
    
  5. Now your Zookeeper and Kafka server are ready and you can download the source code for sample project from here
  6. This is how a Java Client that publishes messages to Kafka looks like, execute it couple of times to publish couple of messages First thing that you have to do while developing a producer is connect to the Kafka server, for that you will set value of metadata.broker.list property to point to the port on which kafka server is listening (You can find value of port and host name from server.properties that you used in step 4. Once you have Producer object you can use it for publishing messages by creating object of kafka.producer.KeyedMessage, you will have to pass name of the topic and message as argument
  7. This is how the Java client for consumer of messages from Kafka looks like, run it and it will start a thread that will keep listening to messages on topic and every time there is a message it will print it to console The HelloKafkaConsumer class extends Thread class. In the constructor of this class first i am creating Properties class with value of zookeeper.connect property equal to the port on which zookeeper server is listening on. In the constructor i am creating object of kafka.javaapi.consumer.ConsumerConnector
    Once the ConsumerConnector is ready in the run() method i am passing it name of the topic on which i want to listen (You can pass multiple topic names here). Everytime there is a new message i am reading it and printing it to console.

20 comments:

  1. Hi Sunil! your blog is awesome and very informative. I want to know that instead of showing message on console, "as you have done in cosnsumer thread run method" how we can return this message to caller method . any Idea?

    ReplyDelete
  2. Thanks for your post.. it is very nice.. How to get unread messages from using kafka consumer?

    ReplyDelete
  3. Hi,

    In you consumer you have specified the topic name as "pythontest" it should be "javatest".

    ReplyDelete
  4. How to read meassage from beginning every time when I run this API.

    ReplyDelete
  5. Hi sunil, I need some help on retrieving kafka topic metadata. I sent email to you as well regarding this. I could not find any api on this.

    ReplyDelete
  6. which scala version did you use?

    ReplyDelete
  7. Hi Sunil,

    You have used Scala API provided in "kafka_2.10-0.8.2.2.jar" while I can see similar API in Java provided in "kafka-clients-0.8.2.0.jar".
    I am totally confused what to use and how? Most of the examples provided on the internet are using the former jar. Could you please provide me some hints or pointer regarding implementing Consumer using Java API (later jar).

    Thanks,
    Kanishka Chauhan

    ReplyDelete
  8. Hey Sunil,

    First of all, thank you so much for such an informative blog on Kafka.

    Secondly, I had a doubt regarding how to write the messages received from Kafka Consumer to the file (text file).
    I have a Kafka Producer code written in java that writes kafka messages. And a consumer code that receives these messages.

    Is it possible to write these received messages by consumer to any text file in java?
    Can you please help me out in this?

    Thanks and Regards,
    Gazal

    ReplyDelete
  9. Hi sunil
    how to read input file from s3 location and how to give output location in kafka
    any commands send me my mail id ; jnp.hadoop@gmail.com

    ReplyDelete
  10. Hi Sunil,

    Is there any way that the producer can get acknowledgement from the consumer on receiving the messages. I tried with the callback() method in the producer.send() but it doesn't provide any details whether the message was consumed by the consumer

    Rgards,
    Sudhish

    ReplyDelete
  11. Yes, there is surely a way to get acknowledgement. You can easily find on internet. There are several examples available explaining the same.

    ReplyDelete
  12. Hi, I tried to run the code posted in the blog by making appropriate changes. The producer code is running successfully and publishing the message but the consumer is not able to consume the message. However, if I run the consumer on cmd then it's able to consume the message from the specified topic.
    What could be the reason...???

    ReplyDelete
  13. I m getting this error

    log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
    log4j:WARN Please initialize the log4j system properly.

    Process finished with exit code 0

    ReplyDelete
  14. Apache kafka(0.9.0.0) : How kafka client acts as both producer and consumer in java ?

    ReplyDelete
  15. This comment has been removed by the author.

    ReplyDelete
  16. Hello Sunil,

    I am getting below error.

    ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console. Set system property 'log4j2.debug' to show Log4j2 internal initialization logging.
    Exception in thread "main" org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server 'localhost:2181' with timeout of 6000 ms
    at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:1233)
    at org.I0Itec.zkclient.ZkClient.(ZkClient.java:157)
    at org.I0Itec.zkclient.ZkClient.(ZkClient.java:131)
    at kafka.utils.ZkUtils$.createZkClientAndConnection(ZkUtils.scala:115)
    at kafka.utils.ZkUtils$.apply(ZkUtils.scala:97)
    at kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:194)
    at kafka.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:142)
    at kafka.javaapi.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:66)
    at kafka.javaapi.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:69)
    at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:123)
    at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)
    at com.spnotes.kafka.HelloKafkaConsumer.(HelloKafkaConsumer.java:41)
    at com.spnotes.kafka.HelloKafkaConsumer.main(HelloKafkaConsumer.java:31)

    Can you please me why?.

    Thanks,
    Pavan Patil
    patilpavan8@gmail.com

    ReplyDelete
  17. Really nice blog post. provided a helpful information. I hope that you will post more updates like this Big Data Hadoop Online Course


    ReplyDelete