setup()
method of my class i am creating object of com.maxmind.geoip2.DatabaseReader
. I am passing the GeoLite2-City.mmdb
the geo ip database file to my reader.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.spnotes.hadoop.logs; | |
import com.maxmind.geoip2.DatabaseReader; | |
import com.maxmind.geoip2.exception.GeoIp2Exception; | |
import com.maxmind.geoip2.model.CityResponse; | |
import org.apache.hadoop.io.IntWritable; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Mapper; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.io.File; | |
import java.io.IOException; | |
import java.net.InetAddress; | |
import java.net.URI; | |
import java.util.regex.Matcher; | |
import java.util.regex.Pattern; | |
public class ApacheLogMapper extends Mapper<LongWritable,Text, Text, IntWritable> { | |
Logger logger = LoggerFactory.getLogger(ApacheLogReducer.class); | |
Pattern logPattern; | |
DatabaseReader reader; | |
private static final int NUM_FIELDS =9; | |
private final static IntWritable one = new IntWritable(1); | |
private Text clientIp = new Text(); | |
@Override | |
protected void setup(Context context) throws IOException, InterruptedException { | |
System.out.println("Including changes for GeoLite2-city.mmdb"); | |
String logEntryPattern = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+) \"([^\"]+)\" \"([^\"]+)\""; | |
logPattern = Pattern.compile(logEntryPattern); | |
URI[] cacheFileURI = context.getCacheFiles(); | |
for(URI uri: cacheFileURI){ | |
System.out.println(uri); | |
} | |
File file = new File("GeoLite2-City.mmdb"); | |
reader = new DatabaseReader.Builder(file).build(); | |
} | |
@Override | |
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { | |
logger.debug("Entering ApacheLogMapper.map()"); | |
Matcher matcher = logPattern.matcher(value.toString()); | |
if (!matcher.matches() || NUM_FIELDS != matcher.groupCount()) { | |
System.out.println("Unable to parse input"); | |
}else{ | |
String ip = matcher.group(1); | |
try { | |
CityResponse response = reader.city(InetAddress.getByName(ip)); | |
StringBuffer city = new StringBuffer(); | |
String cityName = response.getCity().getName(); | |
String countryName = response.getCountry().getName(); | |
if(cityName != null && cityName.length() != 0){ | |
city.append(cityName); | |
city.append(", "); | |
} | |
if(countryName != null && countryName.length() != 0){ | |
city.append(countryName); | |
} | |
clientIp.set(city.toString()); | |
context.write(clientIp, one); | |
}catch(GeoIp2Exception ex){ | |
logger.error("Error in getting address ", ex); | |
} | |
} | |
logger.debug("Exiting ApacheLogMapper.map()"); | |
} | |
} | |
com.maxmind.geoip2.geoip2
along with its dependencies, it also needs access to GeoLite2-City.mmdb
.
Creating single jar with all dependencies
In order for this method to work add following plugin in your maven build file like this, in this case the jar will get created with main class ascom.spnotes.hadoop.logs.ApacheLogMRDriver
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<plugin> | |
<artifactId>maven-assembly-plugin</artifactId> | |
<configuration> | |
<archive> | |
<manifest> | |
<mainClass>com.spnotes.hadoop.logs.ApacheLogMRDriver</mainClass> | |
</manifest> | |
</archive> | |
<descriptorRefs> | |
<descriptorRef>jar-with-dependencies</descriptorRef> | |
</descriptorRefs> | |
</configuration> | |
</plugin> |
mvn clean compile assembly:single
command. Copy the GeoLite2-City.mmdb
to the hadoop cluster(You can package it inside jar but might not be a good idea). Now you can execute following command to execute this job on cluster
hadoop jar ApacheLogsMR-jar-with-dependencies.jar -files GeoLite2-City.mmdb /apache.log /output/logs/apachelogs
Using Hadoop distributed cache
Second option you have is to create a jar only for your map reduce program and pass dependencies to hadoop using distributed cache. In this option, you should make sure that your Driver class extendsConfigured
like this
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.spnotes.hadoop.logs; | |
import org.apache.hadoop.conf.Configured; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.IntWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.util.Tool; | |
import org.apache.hadoop.util.ToolRunner; | |
public class ApacheLogMRDriver extends Configured implements Tool { | |
@Override | |
public int run(String[] args) throws Exception { | |
if(args.length != 2){ | |
System.err.printf("Usage: %s [generic options] <input> <output> \n", getClass().getSimpleName()); | |
ToolRunner.printGenericCommandUsage(System.err); | |
return -1; | |
} | |
Job job = new Job(getConf(),"ApacheLog Parser"); | |
job.setJarByClass(getClass()); | |
FileInputFormat.addInputPath(job, new Path(args[0])); | |
FileOutputFormat.setOutputPath(job, new Path(args[1])); | |
job.setMapperClass(ApacheLogMapper.class); | |
job.setReducerClass(ApacheLogReducer.class); | |
job.setOutputKeyClass(Text.class); | |
job.setOutputValueClass(IntWritable.class); | |
return job.waitForCompletion(true) ? 0 :1; | |
} | |
public static void main(String[] argv)throws Exception{ | |
int exitCode = ToolRunner.run(new ApacheLogMRDriver(), argv); | |
System.exit(exitCode); | |
} | |
} |
GeoLite2-City.mmdb
file on to the same machine that has hadoop cluster. Then execute following command to pass both jar and file to the hadoop.
hadoop jar ApacheLogsMR.jar -libjars geoip2-0.7.1.jar,maxmind-db-0.3.2.jar,jackson-databind-2.2.3.jar,jackson-core-2.2.3.jar,jackson-annotations-2.2.3.jar -files GeoLite2-City.mmdb /apache.log /output/logs/apachelogs
3 comments:
Where will I get sample logs for this sample ?
Can't thank you enough for this. I was looking for an option to share jar resources across reducers without explicitly copying it on hdfs or using DistributedCache. Didn't know about the "-files" option. Thanks again.
Thanks for info....
Website development in Bangalore
Post a Comment