Mrs

Mrs (pronounced "missus") is a simple implementation of MapReduce, Google's well-known parallel programming model. The name "Mrs" is a play on words. I was told that Google engineers often refer to MapReduce programs by the title "mister" (e.g., "Mr. Wordcount"). Naturally, a simple MapReduce implementation could only be called Mrs.

Mrs is not the only open source MapReduce implementation. In fact, Hadoop is a far more mature system. As pointed out on its web page, Hadoop is scalable, economical, efficient, and reliable. However, and without any intended disrespect to Hadoop, I have not found it to be particularly simple. While Hadoop is certainly a more appropriate tool for large-scale data processing, Mrs intends to be more convenient for research and education.

Mrs is currently licensed under the GNU GPL. If you would like to use it in a proprietary program, please contact the Copyright Licensing Office, Brigham Young University, 3760 HBLL, Provo, UT 84602, (801) 422-9339 or 422-3821, e-mail copyright@byu.edu.

Objectives

I created Mrs because I recognized that Hadoop's objectives were not always in line with what I needed in my research environment. Here are a few of the principles that guide the development of Mrs (which may be subconsciously inspired by the Zen of Python):

Keep it Simple
One of the great things about the MapReduce model is that it simplifies parallel computation. Mrs tries to be Pythonic rather than Javariffic.
Be Creative
Although Mrs tries to guarantee the same semantics as presented in Google's MapReduce paper, I have tried to be creative. I think Mrs has an interesting approach to multi-stage MapReduce problems.
Don't Repeat Yourself
The world has many fine job schedulers and filesystems. If Mrs were married to a particular environment, it would not be as flexible. A Mrs program is just a program, not a daemon.

Download

The best way to get Mrs is to check it out with Git:

git clone git://aml.cs.byu.edu/mrs.git

Documentation

The Mrs source code is pretty well documented. I'm still working on proper end-user instructions, and if you are averse to submitting patches, Mrs might still be a little young for you. The best documentation are in the examples directory. Look at wordcount.py and wcverbose.py. They both do the same thing, but the verbose version shows how to write a MapReduce program that is more complicated than just one map function and one reduce function.

WordCount

The original MapReduce paper presented WordCount as a "Hello, world" example. WordCount simply counts the number of occurrences of each word in the input. I encourage the reader to compare and contrast the following WordCount examples from Hadoop and Mrs. I reiterate that Hadoop is a more mature implementation but assert that it often makes problems unnecessarily complicated.

Hadoop WordCount

The following excerpt is from Hadoop's WordCount.java. WordCount.java is licensed under the Apache License, Version 2.0 by Owen O'Malley.

package org.apache.hadoop.examples;

import java.io.*;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.MapReduceBase;

public class WordCount {

  /**
   * Counts the words in each line.
   * For each line of input, break the line into words and emit them as
   * (word, 1).
   */
  public static class MapClass extends MapReduceBase implements Mapper {

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(WritableComparable key, Writable value,
        OutputCollector output,
        Reporter reporter) throws IOException {
      String line = ((Text)value).toString();
      StringTokenizer itr = new StringTokenizer(line);
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        output.collect(word, one);
      }
    }
  }

  /**
   * A reducer class that just emits the sum of the input values.
   */
  public static class Reduce extends MapReduceBase implements Reducer {

    public void reduce(WritableComparable key, Iterator values,
        OutputCollector output,
        Reporter reporter) throws IOException {
      int sum = 0;
      while (values.hasNext()) {
        sum += ((IntWritable) values.next()).get();
      }
      output.collect(key, new IntWritable(sum));
    }
  }

  static void printUsage() {
    System.out.println("wordcount [-m <maps>] [-r <reduces>] <input> <output>");
    System.exit(1);
  }

  /**
   * The main driver for word count map/reduce program.
   * Invoke this method to submit the map/reduce job.
   * @throws IOException When there is communication problems with the 
   *                     job tracker.
   */
  public static void main(String[] args) throws IOException {
    JobConf conf = new JobConf(WordCount.class);
    conf.setJobName("wordcount");

    // the keys are words (strings)
    conf.setOutputKeyClass(Text.class);
    // the values are counts (ints)
    conf.setOutputValueClass(IntWritable.class);

    conf.setMapperClass(MapClass.class);
    conf.setCombinerClass(Reduce.class);
    conf.setReducerClass(Reduce.class);

    List other_args = new ArrayList();
    for(int i=0; i < args.length; ++i) {
      try {
        if ("-m".equals(args[i])) {
          conf.setNumMapTasks(Integer.parseInt(args[++i]));
        } else if ("-r".equals(args[i])) {
          conf.setNumReduceTasks(Integer.parseInt(args[++i]));
        } else {
          other_args.add(args[i]);
        }
      } catch (NumberFormatException except) {
        System.out.println("ERROR: Integer expected instead of " + args[i]);
        printUsage();
      } catch (ArrayIndexOutOfBoundsException except) {
        System.out.println("ERROR: Required parameter missing from " +
                           args[i-1]);
        printUsage(); // exits
      }
    }
    // Make sure there are exactly 2 parameters left.
    if (other_args.size() != 2) {
      System.out.println("ERROR: Wrong number of parameters: " +
          other_args.size() + " instead of 2.");
      printUsage();
    }
    conf.setInputPath(new Path((String) other_args.get(0)));
    conf.setOutputPath(new Path((String) other_args.get(1)));

    // Uncomment to run locally in a single process
    // conf.set("mapred.job.tracker", "local");

    JobClient.runJob(conf);
  }

}

Mrs WordCount

The following excerpt is from Mrs.

import mrs

def mapper(key, value):
    for word in value.split():
        yield (word, str(1))

def reducer(key, value_iter):
    yield str(sum(int(x) for x in value_iter))

if __name__ == '__main__':
    mrs.main(mrs.Registry(globals()))