This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.spnotes.hadoop.logs; | |
import com.maxmind.geoip2.DatabaseReader; | |
import com.maxmind.geoip2.exception.GeoIp2Exception; | |
import com.maxmind.geoip2.model.CityResponse; | |
import org.apache.hadoop.io.IntWritable; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Mapper; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.io.File; | |
import java.io.IOException; | |
import java.net.InetAddress; | |
import java.net.URI; | |
import java.util.regex.Matcher; | |
import java.util.regex.Pattern; | |
public class ApacheLogMapper extends Mapper<LongWritable,Text, Text, IntWritable> { | |
Logger logger = LoggerFactory.getLogger(ApacheLogReducer.class); | |
enum GEO{ | |
ERROR, | |
US, | |
INDIA, | |
OTHER | |
}; | |
Pattern logPattern; | |
DatabaseReader reader; | |
private static final int NUM_FIELDS =9; | |
private final static IntWritable one = new IntWritable(1); | |
private Text clientIp = new Text(); | |
@Override | |
protected void setup(Context context) throws IOException, InterruptedException { | |
System.out.println("Including changes for GeoLite2-city.mmdb"); | |
String logEntryPattern = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+) \"([^\"]+)\" \"([^\"]+)\""; | |
logPattern = Pattern.compile(logEntryPattern); | |
File file = new File("GeoLite2-City.mmdb"); | |
System.out.println("Value of context.getWorkingDirectory() " +context.getWorkingDirectory()); | |
System.out.println("Value of file.getAbsolutePath() " +file.getAbsolutePath()); | |
reader = new DatabaseReader.Builder(file).build(); | |
} | |
@Override | |
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { | |
logger.debug("Entering ApacheLogMapper.map()"); | |
Matcher matcher = logPattern.matcher(value.toString()); | |
if (!matcher.matches() || NUM_FIELDS != matcher.groupCount()) { | |
System.out.println("Unable to parse input"); | |
}else{ | |
String ip = matcher.group(1); | |
try { | |
CityResponse response = reader.city(InetAddress.getByName(ip)); | |
StringBuffer city = new StringBuffer(); | |
String cityName = response.getCity().getName(); | |
String countryName = response.getCountry().getName(); | |
if(cityName != null && cityName.length() != 0){ | |
city.append(cityName); | |
city.append(", "); | |
} | |
if(countryName != null && countryName.length() != 0){ | |
city.append(countryName); | |
if(countryName.equals("United States")) | |
context.getCounter(GEO.US).increment(1); | |
else if(countryName.equals("India")) | |
context.getCounter(GEO.INDIA).increment(1); | |
else | |
context.getCounter(GEO.OTHER).increment(1); | |
} | |
clientIp.set(city.toString()); | |
context.write(clientIp, one); | |
}catch(GeoIp2Exception ex){ | |
logger.error("Error in getting address ", ex); | |
context.getCounter(GEO.ERROR).increment(1); | |
} | |
} | |
logger.debug("Exiting ApacheLogMapper.map()"); | |
} | |
} |
- Declare Enum: On line 24 i had to create a Enum for GEO, i am declaring 4 different counters in it one for ERRORS, 1 for ips in US, 1 for ips in India and 1 for everything else
- Count countries: On line 67 i am checking if the location of the ip is in USA, if yes i am increasing counter for USA by 1 using
context.getCounter(GEO.US).increment(1)
- Counting errors: Whenever GeoIP API is not able to find the ip in GeoIP database it throws
GeoIp2Exception
, i am catching that exception and using it as opportunity to increment ERROR count by 1 usingcontext.getCounter(GEO.ERROR).increment(1);
No comments:
Post a Comment