First i did create Address table in my local mysql like this
CREATE TABLE `address` (
`addressid` int(11) NOT NULL AUTO_INCREMENT,
`contactid` int(11) DEFAULT NULL,
`line1` varchar(300) NOT NULL,
`city` varchar(50) NOT NULL,
`state` varchar(50) NOT NULL,
`zip` varchar(50) NOT NULL,
`lastmodified` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`addressid`),
KEY `contactid` (`contactid`),
CONSTRAINT `address_ibfk_1` FOREIGN KEY (`contactid`) REFERENCES `CONTACT` (`contactid`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8;
Then i did add 5 sample records to the address table. When i query address table on my local this is what i get
After that i did create a Spark Scala project that has mysql-connector-java as one of the dependencies
The last step was to create a simple Spark program like this,
My program has 4 main sections
- First is Address as case class with same schema as that of Address table, without lastmodified field
- Next is this call to create object of JdbcRDD that says query everything from address with addressid between 1 and 5.
new JdbcRDD(sparkContext, getConnection, "select * from address limit ?,?", 0, 5, 1, convertToAddress)
- Then i did define getConnection() method that creates JDBC connection to my database and returns it
- Last is the convertToAddress() method that knows how to take a ResultSet and convert it into object of Address