First i did create Address table in my local mysql like this
CREATE TABLE `address` (
`addressid` int(11) NOT NULL AUTO_INCREMENT,
`contactid` int(11) DEFAULT NULL,
`line1` varchar(300) NOT NULL,
`city` varchar(50) NOT NULL,
`state` varchar(50) NOT NULL,
`zip` varchar(50) NOT NULL,
`lastmodified` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`addressid`),
KEY `contactid` (`contactid`),
CONSTRAINT `address_ibfk_1` FOREIGN KEY (`contactid`) REFERENCES `CONTACT` (`contactid`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8;
Then i did add 5 sample records to the address table. When i query address table on my local this is what i get After that i did create a Spark Scala project that has mysql-connector-java as one of the dependencies
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<project xmlns="http://maven.apache.org/POM/4.0.0" | |
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | |
<modelVersion>4.0.0</modelVersion> | |
<groupId>com.spnotes.spark</groupId> | |
<artifactId>JDBCSpark</artifactId> | |
<version>1.0-SNAPSHOT</version> | |
<properties> | |
<maven.compiler.target>1.7</maven.compiler.target> | |
<encoding>UTF-8</encoding> | |
<scala.tools.version>2.10</scala.tools.version> | |
<scala.version>2.10.4</scala.version> | |
<spark.version>1.5.2</spark.version> | |
</properties> | |
<dependencies> | |
<dependency> | |
<groupId>org.scala-lang</groupId> | |
<artifactId>scala-library</artifactId> | |
<version>${scala.version}</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.spark</groupId> | |
<artifactId>spark-core_${scala.tools.version}</artifactId> | |
<version>${spark.version}</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.spark</groupId> | |
<artifactId>spark-hive_${scala.tools.version}</artifactId> | |
<version>${spark.version}</version> | |
</dependency> | |
<dependency> | |
<groupId>mysql</groupId> | |
<artifactId>mysql-connector-java</artifactId> | |
<version>5.1.38</version> | |
</dependency> | |
</dependencies> | |
<build> | |
<sourceDirectory>src/main/scala</sourceDirectory> | |
<plugins> | |
<plugin> | |
<groupId>org.scala-tools</groupId> | |
<artifactId>maven-scala-plugin</artifactId> | |
<version>2.15.2</version> | |
<executions> | |
<execution> | |
<goals> | |
<goal>compile</goal> | |
</goals> | |
</execution> | |
</executions> | |
</plugin> | |
</plugins> | |
</build> | |
</project> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.spnotes.spark | |
import java.sql.{Connection, DriverManager, ResultSet} | |
import org.apache.spark.rdd.JdbcRDD | |
import org.apache.spark.{SparkConf, SparkContext} | |
/** | |
* Created by sunilpatil on 4/19/16. | |
*/ | |
object JDBCRDDClient { | |
case class Address(addressId: Int, contactId: Int, line1: String, city: String, state: String, zip: String) | |
def main(argv: Array[String]): Unit = { | |
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("HelloJDBC") | |
val sparkContext = new SparkContext(sparkConf) | |
val jdbcRdd = new JdbcRDD(sparkContext, getConnection, | |
"select * from address limit ?,?", | |
0, 5, 1, convertToAddress) | |
jdbcRdd.foreach(println) | |
} | |
def getConnection(): Connection = { | |
Class.forName("com.mysql.jdbc.Driver") | |
DriverManager.getConnection("jdbc:mysql://localhost/test1?" + "user=test1&password=test1") | |
} | |
def convertToAddress(rs: ResultSet): Address = { | |
new Address(rs.getInt("addressid"), rs.getInt("contactid"), rs.getString("line1"), | |
rs.getString("city"), rs.getString("state"), rs.getString("zip")) | |
} | |
} |
- First is Address as case class with same schema as that of Address table, without lastmodified field
- Next is this call to create object of JdbcRDD that says query everything from address with addressid between 1 and 5.
new JdbcRDD(sparkContext, getConnection, "select * from address limit ?,?", 0, 5, 1, convertToAddress)
- Then i did define getConnection() method that creates JDBC connection to my database and returns it
- Last is the convertToAddress() method that knows how to take a ResultSet and convert it into object of Address