Writing data from Spark to ElasticSearch

ElasticSearch for Apache Hadoop project has introduced a way to directly write to ElasticSearch without going through Elastic Search OutputFormat. I wanted to try that out so i built simple application that saves output of word count into Elastic Search, you can download this project from github First thing that i had to do was to build maven pom.xml that includes org.elasticsearch.elasticsearch-hadoop version 5.0 jar. I could not find it in the regular maven repository so i had to include elasticsearch repository in my pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.spnotes.spark</groupId>
<artifactId>HelloElasticWriter</artifactId>
<version>1.0-SNAPSHOT</version>
<repositories>
<repository>
<id>sonatype-oss</id>
<url>http://oss.sonatype.org/content/repositories/snapshots</url>
<snapshots><enabled>true</enabled></snapshots>
</repository>
</repositories>
<properties>
<maven.compiler.target>1.7</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.tools.version>2.10</scala.tools.version>
<scala.version>2.10.4</scala.version>
<spark.version>1.5.2</spark.version>
<spark.streams.version>1.6.1</spark.streams.version>
<elasticsearch.version>2.4.2</elasticsearch.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.tools.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>5.0.1</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
view raw pom.xml hosted with ❤ by GitHub
Then this is how my Spark program looks like, the main part is line 42 where i create Map of all the properties that i need for saving this RDD into ElasticSearch and then line 43, where i am calling wordCountJson.saveToEs(esMap), which actually takes care of writing data into elasticsearch
package org.spnotes.spark
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._
import scala.util.parsing.json.JSON
/**
* Created by sunilpatil on 11/22/16.
*/
object SparkElasticSearchOutput {
def main(argv: Array[String]): Unit = {
if (argv.length != 4) {
println("Usage Pattern: SparkElasticSearchOutput <inputfilepath> " +
"<eshostname> <espost> <esresource>")
return
}
val filePath = argv(0)
val esHost = argv(1)
val esPort = argv(2)
val esResource = argv(3)
val sparkConf = new SparkConf().setAppName("SparkElasticSearchOutput").setMaster("local[*]")
val sparkContext = new SparkContext(sparkConf);
val lines = sparkContext.textFile(filePath)
val wordCount = lines.flatMap(line => line.split(" ")).map(word => (word, 1)).
reduceByKey((x, y) => x + y)
//Take out empty words/words with length 0, take out words containing .
val validWords = wordCount.filter(record => record._1.trim.length != 0).filter(record => !record._1.contains("."))
val wordCountJson = validWords.map { record =>
val jsonStr = "{\"" + record._1 + "\":\"" + record._2 + "\"}"
JSON.parseFull(jsonStr)
}.filter(record => record != None)
wordCountJson.foreach(println)
// val esMap = Map("es.nodes" -> "localhost", "es.port" -> "9200", "es.resource" -> "wordcount/counts")
//Since we already have JSON object save it as it is
val esMap = Map("es.nodes" -> esHost, "es.port" -> esPort, "es.resource" -> esResource, "es.output.json" -> "true")
wordCountJson.saveToEs(esMap)
}
}

How to use KafkaLog4jAppender for sending Log4j logs to kafka

Apache Kafka has a KafkaLog4jAppender that you can use for redirecting your Log4j log to Kafka topic. I wanted to try it out so i used following steps, you can download sample project from here First i created a simple standalone java program that use Log4j like this. As you can see this is like any other normal Java program that uses Log4j.
package com.spnotes.kafka;
import org.apache.log4j.LogManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Created by sunilpatil on 11/7/16.
*/
public class HelloKafkaLogger {
private static final Logger logger = LoggerFactory.getLogger(HelloKafkaLogger.class);
public static void main(String[] argv) {
logger.debug("Debug message from HelloKafkaLogger.main," );
logger.info("Info message from HelloKafkaLogger.main" );
logger.warn("Warn message from HelloKafkaLogger.main");
logger.error("Error message from HelloKafkaLogger.main" );
LogManager.shutdown();
}
}
Then in the log4j.properties file i added line 12 to 17 for using KafkaLog4jAppender, on line 13, value of brokerList property points to the Kafka server and line 14 value of topic points to the Kafka topic name to which logs should go.
# Root logger option
log4j.rootLogger=DEBUG, stdout, kafka
log4j.logger.kafka=WARN
log4j.logger.org.apache.kafka=WARN
# Redirect log messages to console
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
log4j.appender.kafka=org.apache.kafka.log4jappender.KafkaLog4jAppender
log4j.appender.kafka.brokerList=localhost:9092
log4j.appender.kafka.topic=kafkalogger
log4j.appender.kafka.layout=org.apache.log4j.PatternLayout
log4j.appender.kafka.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
log4j.appender.kafka.level=INFO
Now before running this program make sure that you actually have topic named kafkalogger, if not you can create using this command

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkalogger
You can verify if you have topic named kafkalogger by executing following command

bin/kafka-topics.sh --list --zookeeper localhost:2181
Also you can run kafka console consumer that reads messages from Kafka and prints them to console, using following command

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafkalogger
Now when you run your java program you should see messages on console like this