Using counters in MapReduce program

While developing mapreduce jobs you might want to keep counters for some conditions that you find. For example in Map Reduce job that uses GeoIP to counts number of requests from particular city, i want to check how many requests came from US, India vs. other countries. Also there are cases when you try to find out location of a IP address and if the IP is not in the GeoIP database it throws error. I wanted to see how many ips are not found in DB. In order to do that i changed the ApacheLogMapper.java like this
package com.spnotes.hadoop.logs;
import com.maxmind.geoip2.DatabaseReader;
import com.maxmind.geoip2.exception.GeoIp2Exception;
import com.maxmind.geoip2.model.CityResponse;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class ApacheLogMapper extends Mapper<LongWritable,Text, Text, IntWritable> {
Logger logger = LoggerFactory.getLogger(ApacheLogReducer.class);
enum GEO{
ERROR,
US,
INDIA,
OTHER
};
Pattern logPattern;
DatabaseReader reader;
private static final int NUM_FIELDS =9;
private final static IntWritable one = new IntWritable(1);
private Text clientIp = new Text();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
System.out.println("Including changes for GeoLite2-city.mmdb");
String logEntryPattern = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+) \"([^\"]+)\" \"([^\"]+)\"";
logPattern = Pattern.compile(logEntryPattern);
File file = new File("GeoLite2-City.mmdb");
System.out.println("Value of context.getWorkingDirectory() " +context.getWorkingDirectory());
System.out.println("Value of file.getAbsolutePath() " +file.getAbsolutePath());
reader = new DatabaseReader.Builder(file).build();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
logger.debug("Entering ApacheLogMapper.map()");
Matcher matcher = logPattern.matcher(value.toString());
if (!matcher.matches() || NUM_FIELDS != matcher.groupCount()) {
System.out.println("Unable to parse input");
}else{
String ip = matcher.group(1);
try {
CityResponse response = reader.city(InetAddress.getByName(ip));
StringBuffer city = new StringBuffer();
String cityName = response.getCity().getName();
String countryName = response.getCountry().getName();
if(cityName != null && cityName.length() != 0){
city.append(cityName);
city.append(", ");
}
if(countryName != null && countryName.length() != 0){
city.append(countryName);
if(countryName.equals("United States"))
context.getCounter(GEO.US).increment(1);
else if(countryName.equals("India"))
context.getCounter(GEO.INDIA).increment(1);
else
context.getCounter(GEO.OTHER).increment(1);
}
clientIp.set(city.toString());
context.write(clientIp, one);
}catch(GeoIp2Exception ex){
logger.error("Error in getting address ", ex);
context.getCounter(GEO.ERROR).increment(1);
}
}
logger.debug("Exiting ApacheLogMapper.map()");
}
}
I had to make following changes in the ApacheLogMapper.java
  1. Declare Enum: On line 24 i had to create a Enum for GEO, i am declaring 4 different counters in it one for ERRORS, 1 for ips in US, 1 for ips in India and 1 for everything else
  2. Count countries: On line 67 i am checking if the location of the ip is in USA, if yes i am increasing counter for USA by 1 using context.getCounter(GEO.US).increment(1)
  3. Counting errors: Whenever GeoIP API is not able to find the ip in GeoIP database it throws GeoIp2Exception, i am catching that exception and using it as opportunity to increment ERROR count by 1 using context.getCounter(GEO.ERROR).increment(1);
After executing the MapReduce program i could see the different counters calculated by Hadoop on the console output like this

No comments: