- Download the Kafka binaries from Kafka download page
-
Unzip the kafka tar file by executing
tar -xzf kafka_2.9.2-0.8.1.1.tgz
. Then go to kafka directory by executingcd kafka_2.9.2-0.8.1.1
- Next start the Zookeeper server by executing following command
bin/zookeeper-server-start.sh config/zookeeper.properties
- Start the Kafka server by executing following command
bin/kafka-server-start.sh config/server.properties
- Now your Zookeeper and Kafka server are ready and you can download the source code for sample project from here
-
This is how a Java Client that publishes messages to Kafka looks like, execute it couple of times to publish couple of messages
First thing that you have to do while developing a producer is connect to the Kafka server, for that you will set value of
metadata.broker.list
property to point to the port on which kafka server is listening (You can find value of port and host name from server.properties that you used in step 4. Once you haveProducer
object you can use it for publishing messages by creating object ofkafka.producer.KeyedMessage
, you will have to pass name of the topic and message as argument -
This is how the Java client for consumer of messages from Kafka looks like, run it and it will start a thread that will keep listening to messages on topic and every time there is a message it will print it to console
The
HelloKafkaConsumer
class extends Thread class. In the constructor of this class first i am creating Properties class with value ofzookeeper.connect
property equal to the port on which zookeeper server is listening on. In the constructor i am creating object ofkafka.javaapi.consumer.ConsumerConnector
Once the ConsumerConnector is ready in the run() method i am passing it name of the topic on which i want to listen (You can pass multiple topic names here). Everytime there is a new message i am reading it and printing it to console.
Java Client for publishing and consuming messages from Apache Kafka
I wanted to learn how to use Apache Kafka for publishing and consuming messages from Apache Kafka using Java client, so i followed these steps.
Hi Sunil! your blog is awesome and very informative. I want to know that instead of showing message on console, "as you have done in cosnsumer thread run method" how we can return this message to caller method . any Idea?
ReplyDeleteThanks for your post.. it is very nice.. How to get unread messages from using kafka consumer?
ReplyDeleteHi,
ReplyDeleteIn you consumer you have specified the topic name as "pythontest" it should be "javatest".
How to read meassage from beginning every time when I run this API.
ReplyDeleteHi sunil, I need some help on retrieving kafka topic metadata. I sent email to you as well regarding this. I could not find any api on this.
ReplyDeletewhich scala version did you use?
ReplyDeleteWe made java sample archive on Core Java Code Examples
ReplyDeleteHi Sunil,
ReplyDeleteYou have used Scala API provided in "kafka_2.10-0.8.2.2.jar" while I can see similar API in Java provided in "kafka-clients-0.8.2.0.jar".
I am totally confused what to use and how? Most of the examples provided on the internet are using the former jar. Could you please provide me some hints or pointer regarding implementing Consumer using Java API (later jar).
Thanks,
Kanishka Chauhan
Hey Sunil,
ReplyDeleteFirst of all, thank you so much for such an informative blog on Kafka.
Secondly, I had a doubt regarding how to write the messages received from Kafka Consumer to the file (text file).
I have a Kafka Producer code written in java that writes kafka messages. And a consumer code that receives these messages.
Is it possible to write these received messages by consumer to any text file in java?
Can you please help me out in this?
Thanks and Regards,
Gazal
Hi sunil
ReplyDeletehow to read input file from s3 location and how to give output location in kafka
any commands send me my mail id ; jnp.hadoop@gmail.com
Hi Sunil,
ReplyDeleteIs there any way that the producer can get acknowledgement from the consumer on receiving the messages. I tried with the callback() method in the producer.send() but it doesn't provide any details whether the message was consumed by the consumer
Rgards,
Sudhish
Yes, there is surely a way to get acknowledgement. You can easily find on internet. There are several examples available explaining the same.
ReplyDeleteHi, I tried to run the code posted in the blog by making appropriate changes. The producer code is running successfully and publishing the message but the consumer is not able to consume the message. However, if I run the consumer on cmd then it's able to consume the message from the specified topic.
ReplyDeleteWhat could be the reason...???
THANK YOU!!!!!!
ReplyDeleteI m getting this error
ReplyDeletelog4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
Process finished with exit code 0
Apache kafka(0.9.0.0) : How kafka client acts as both producer and consumer in java ?
ReplyDeleteThis comment has been removed by the author.
ReplyDeleteHello Sunil,
ReplyDeleteI am getting below error.
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console. Set system property 'log4j2.debug' to show Log4j2 internal initialization logging.
Exception in thread "main" org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server 'localhost:2181' with timeout of 6000 ms
at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:1233)
at org.I0Itec.zkclient.ZkClient.(ZkClient.java:157)
at org.I0Itec.zkclient.ZkClient.(ZkClient.java:131)
at kafka.utils.ZkUtils$.createZkClientAndConnection(ZkUtils.scala:115)
at kafka.utils.ZkUtils$.apply(ZkUtils.scala:97)
at kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:194)
at kafka.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:142)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:66)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:69)
at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:123)
at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)
at com.spnotes.kafka.HelloKafkaConsumer.(HelloKafkaConsumer.java:41)
at com.spnotes.kafka.HelloKafkaConsumer.main(HelloKafkaConsumer.java:31)
Can you please me why?.
Thanks,
Pavan Patil
patilpavan8@gmail.com
Great website and content of your website is really awesome.
ReplyDeleteBig Data Training
Big Data Course in Chennai
Really nice blog post. provided a helpful information. I hope that you will post more updates like this Big Data Hadoop Online Course
ReplyDelete