Some time you might have a bad record in Kafka topic that you want to delete. Kafka does not provide direct option to delete specific record. Only way to delete records is to expire them. You can achieve this by setting data retention to say 1 second that expires all the old messages.
You can follow these steps
First check the topic to find out value of retention.ms config parameter for the topic
kafka_2.11-0.11.0.1 spatil$ bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name my-topic --entity-type topics
Configs for topic 'my-topic' are retention.ms=86400000
Change value of retention.ms to 1 which means all messages older than 1 ms will be expired
Then I did execute alter command on my partition and changed number of partitions from 1 to 3
spatil$ bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic --partitions 3
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
I did execute describe command on my topic to verify that it actually has 3 topics
Shutdown the consumer so that you can restart the consumer
Now go back/reset the offset so that it goes back to first message
3bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group user.kafkaconsumer --reset-offsets --to-earliest --all-topics --execute
Go back and verify that the consumer offset actually went back by executing following command
I wanted to figure out how to connect to RDBMS from spark and extract data, so i followed these steps. You can download this project form github
First i did create Address table in my local mysql like this
CREATE TABLE `address` (
`addressid` int(11) NOT NULL AUTO_INCREMENT,
`contactid` int(11) DEFAULT NULL,
`line1` varchar(300) NOT NULL,
`city` varchar(50) NOT NULL,
`state` varchar(50) NOT NULL,
`zip` varchar(50) NOT NULL,
`lastmodified` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`addressid`),
KEY `contactid` (`contactid`),
CONSTRAINT `address_ibfk_1` FOREIGN KEY (`contactid`) REFERENCES `CONTACT` (`contactid`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8;
Then i did add 5 sample records to the address table. When i query address table on my local this is what i get
After that i did create a Spark Scala project that has mysql-connector-java as one of the dependencies
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
The last step was to create a simple Spark program like this,
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
First is Address as case class with same schema as that of Address table, without lastmodified field
Next is this call to create object of JdbcRDD that says query everything from address with addressid between 1 and 5.
new JdbcRDD(sparkContext, getConnection,
"select * from address limit ?,?",
0, 5, 1, convertToAddress)
Then i did define getConnection() method that creates JDBC connection to my database and returns it
Last is the convertToAddress() method that knows how to take a ResultSet and convert it into object of Address
When i run this program in IDE this is the output i get
Recently i wanted to implement a simple Least recently used (LRU) cache in one my applications. But my use case is simple enough that instead of going for something ehcache i decided to build it on own by using java.util.LinkedHashMap
As you can see from the code its very simple. All you have to do is extend java.util.LinkedHashMap and override its protected removeEldestEntry() method so that it checks if the size of map is greater than a size you specified while creating the Map if yes remove the eldest entry
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Now the question is when Map is full which entry will it remove, you have 2 options
Eldest: If you just want to remove the first entry that you inserted in the Map when adding a new entry then in your constructor you could use super(cacheSize, 0.75f);, so LinkedHashMap wont keep track of when a particular entry were accessed.
Least recently used (LRU): But if you want to make sure that the entry that was least recently used should be removed then call super(cacheSize, 0.75f, true); from constructor of your LRUCache so that LinkedHashMap keeps track of when entry was accessed and removes the Least recently used entry
In Spark Kafka Streaming Java program Word Count using Kafka 0.10 API blog entry i talked about how you create a simple java program that uses Spark Streaming's Kafka10 API using Java. This blog entry does the same thing but using Scala. You can download the complete application from github
You can run this sample by first downloading Kafka 0.10.* from Apache Kafka WebSite, then you can create and start a test topic and send messages to it by following this Kafka Quick start document
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Kafka API went through a lot of changes starting Kafka 0.9. Spark Kafka Streaming API also was changed to better support Kafka 0.9. i wanted to try that out so i built this simple Word Count application using Kafka 0.10 API. This blog entry does the same thing but using Scala. You can download the complete application from github
You can run this sample by first downloading Kafka 0.10.* from Apache Kafka WebSite, then you can create and start a test topic and send messages to it by following this Kafka Quick start document
First thing i did was to include Kafka 0.10 API dependencies for the Spark Project. As you can see i am using Spark 2.1 version
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Then i did create a SparkKafka10.java file that looks like this. Please take a look at comments inside the code for what i am doing.
Now if you create test topic and send messages to it, you should be able to see the wordcount on console
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters