org.elasticsearch.elasticsearch-hadoop
version 5.0 jar. I could not find it in the regular maven repository so i had to include elasticsearch repository in my pom.xml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<project xmlns="http://maven.apache.org/POM/4.0.0" | |
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | |
<modelVersion>4.0.0</modelVersion> | |
<groupId>com.spnotes.spark</groupId> | |
<artifactId>HelloElasticWriter</artifactId> | |
<version>1.0-SNAPSHOT</version> | |
<repositories> | |
<repository> | |
<id>sonatype-oss</id> | |
<url>http://oss.sonatype.org/content/repositories/snapshots</url> | |
<snapshots><enabled>true</enabled></snapshots> | |
</repository> | |
</repositories> | |
<properties> | |
<maven.compiler.target>1.7</maven.compiler.target> | |
<encoding>UTF-8</encoding> | |
<scala.tools.version>2.10</scala.tools.version> | |
<scala.version>2.10.4</scala.version> | |
<spark.version>1.5.2</spark.version> | |
<spark.streams.version>1.6.1</spark.streams.version> | |
<elasticsearch.version>2.4.2</elasticsearch.version> | |
</properties> | |
<dependencies> | |
<dependency> | |
<groupId>org.scala-lang</groupId> | |
<artifactId>scala-library</artifactId> | |
<version>${scala.version}</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.spark</groupId> | |
<artifactId>spark-core_${scala.tools.version}</artifactId> | |
<version>${spark.version}</version> | |
</dependency> | |
<dependency> | |
<groupId>org.elasticsearch</groupId> | |
<artifactId>elasticsearch-hadoop</artifactId> | |
<version>5.0.1</version> | |
</dependency> | |
</dependencies> | |
<build> | |
<sourceDirectory>src/main/scala</sourceDirectory> | |
<plugins> | |
<plugin> | |
<groupId>org.scala-tools</groupId> | |
<artifactId>maven-scala-plugin</artifactId> | |
<version>2.15.2</version> | |
<executions> | |
<execution> | |
<goals> | |
<goal>compile</goal> | |
</goals> | |
</execution> | |
</executions> | |
</plugin> | |
</plugins> | |
</build> | |
</project> |
wordCountJson.saveToEs(esMap)
, which actually takes care of writing data into elasticsearch
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.spnotes.spark | |
import org.apache.spark.{SparkConf, SparkContext} | |
import org.elasticsearch.spark._ | |
import scala.util.parsing.json.JSON | |
/** | |
* Created by sunilpatil on 11/22/16. | |
*/ | |
object SparkElasticSearchOutput { | |
def main(argv: Array[String]): Unit = { | |
if (argv.length != 4) { | |
println("Usage Pattern: SparkElasticSearchOutput <inputfilepath> " + | |
"<eshostname> <espost> <esresource>") | |
return | |
} | |
val filePath = argv(0) | |
val esHost = argv(1) | |
val esPort = argv(2) | |
val esResource = argv(3) | |
val sparkConf = new SparkConf().setAppName("SparkElasticSearchOutput").setMaster("local[*]") | |
val sparkContext = new SparkContext(sparkConf); | |
val lines = sparkContext.textFile(filePath) | |
val wordCount = lines.flatMap(line => line.split(" ")).map(word => (word, 1)). | |
reduceByKey((x, y) => x + y) | |
//Take out empty words/words with length 0, take out words containing . | |
val validWords = wordCount.filter(record => record._1.trim.length != 0).filter(record => !record._1.contains(".")) | |
val wordCountJson = validWords.map { record => | |
val jsonStr = "{\"" + record._1 + "\":\"" + record._2 + "\"}" | |
JSON.parseFull(jsonStr) | |
}.filter(record => record != None) | |
wordCountJson.foreach(println) | |
// val esMap = Map("es.nodes" -> "localhost", "es.port" -> "9200", "es.resource" -> "wordcount/counts") | |
//Since we already have JSON object save it as it is | |
val esMap = Map("es.nodes" -> esHost, "es.port" -> esPort, "es.resource" -> esResource, "es.output.json" -> "true") | |
wordCountJson.saveToEs(esMap) | |
} | |
} |