Uncategorized
Kanban Martien van Steenbergen 22 May, 2013
(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)Are simple enough to read and understand, but how do you apply MapReduce to a problem you face in a real-life project? This blog tries to give some insight into how to apply MapReduce with Hadoop.
public static boolean isAnagram(String first, String second) { // Checks that the two inputs are anagrams, by checking they have all the same characters. // Left as exercise for the user... }The application would have to somehow execute this function on all pairs of words in the input. However fast this method would be, the overall execution would still take quite some time. Hadoopifying... How do you now design a MapReduce algorithm that will give the desired answer? The key lies in finding a function that will produce the same key for all words that are anagrams. Applying this in the map phase will use the power of the MapReduce framework to deliver all words that are anagrams to the same reducer. The solution, when found, is deceivingly simple as usual:
public static String sortCharacters(String input) { char[] cs = input.toCharArray(); Arrays.sort(cs); return new String(cs); }By sorting all the characters in all the words in the input, all anagrams will have the same key:
aspired -> adeiprs despair -> adeiprsNow the list of characters to the right has no meaning, but all anagrams will have exactly the same result for this function. Implementation Once the algorithm is found the implementation using Hadoop is quite straightforward and simple (though pretty long...). [java] public class AnagramFinder extends Configured implements Tool { public static class Mapper extends org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, Text> { private Text sortedText = new Text(); private Text outputValue = new Text(); protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer tokenizer = new StringTokenizer(value.toString(), " \t\n\r\f,.:()!?", false); while (tokenizer.hasMoreTokens()) { String token = tokenizer.nextToken().trim().toLowerCase(); sortedText.set(sort(token)); outputValue.set(token); context.write(sortedText, outputValue); } } protected String sort(String input) { char[] cs = input.toCharArray(); Arrays.sort(cs); return new String(cs); } } public static class Combiner extends org.apache.hadoop.mapreduce.Reducer<Text, Text, Text, Text> { protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Set<Text> uniques = new HashSet<Text>(); for (Text value : values) { if (uniques.add(value)) { context.write(key, value); } } } } public static class Reducer extends org.apache.hadoop.mapreduce.Reducer<Text, Text, IntWritable, Text> { private IntWritable count = new IntWritable(); private Text outputValue = new Text(); protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Set<Text> uniques = new HashSet<Text>(); int size = 0; StringBuilder builder = new StringBuilder(); for (Text value : values) { if (uniques.add(value)) { size++; builder.append(value.toString()); builder.append(','); } } builder.setLength(builder.length() - 1); if (size > 1) { count.set(size); outputValue.set(builder.toString()); context.write(count, outputValue); } } } public int run(String[] args) throws Exception { Path inputPath = new Path(args[0]); Path outputPath = new Path(args[1]); Job job = new Job(getConf(), "Anagram Finder"); job.setJarByClass(AnagramFinder.class); FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); job.setMapperClass(Mapper.class); job.setCombinerClass(Combiner.class); job.setReducerClass(Reducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); return job.waitForCompletion(false) ? 0 : -1; } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new Configuration(), new AnagramFinder(), args)); } } [/java] The main parts of the implementation are the following: Mapper - Breaks up the input text in tokens (filtering some common punctuation marks) and applies the character sorting to arrive at the required key. Combiner (optional) - Removes duplicate values from the input. Reducer - Collects anagrams and outputs the number of anagrams (key) and all the words concatenated (value). Main and Run - This code configures the job to run on the MapReduce framework. The Combiner is used to do some preprocessing for the reducer. The main reason for this is that results from Mappers that are run on different nodes will be processed on the same node and will thus have to travel the network. To minimize network load, the Combiner might reduce the number of < key , value > pairs that will be processed, as shown here by filtering out duplicates. The process can however not rely on all < key , value > pairs to be processed by a single Combiner, so the Reducer will also have to remove duplicates. It is interesting to see that the concept Anagram doesn't materialize anywhere in this code. The fact that the code finds anagrams follows from the fact that all anagrams will have the same value from the sort function and that is used as the map output key. This might be quite confusing for readers.