|
CSE Home | About Us | Search | Contact Info |
|
Map Reductions in HadoopThis tutorial covers the basic concepts and code of Hadoop. The Hadoop system makes it easy to parallelize a particular form of computation across a large data set, just like Google's MapReduce. It automates the process of splitting up and coordinating a computation and its data to run on thousands of machines at once. Although the implementation of Hadoop is complex, writing code that uses Hadoop is actually not very hard (hence, a successful abstraction). This tutorial walks through a few code examples to illustrate the key features. Both examples make use of literature text sources - Shakespeare's complete works and Mark Twain's Huckleberry Finn. These and many others works are available at Project Gutenberg. However, we also encourage you to find and use alternate text sources that are interesting to you, even if they're not available at Project Gutenberg. Hadoop ConceptsTo use Hadoop, you write two classes -- a Mapper and a Reducer. The Mapper class contains a map function, which is called once for each input and outputs any number of intermediate <key, value> pairs. What code you put in the map function depends on the problem you are trying to solve. Let's start with a short example. Suppose the goal is to create an "index" of a body of text -- we are given a text file, and we want to output a list of words annotated with the line-number at which each word appears. For that problem, an appropriate Map strategy is: for each word in the input, output the pair <word, line-number> For example, suppose we have this five-line High school football coach quote as our input data set:
We are not what
we want to be, but at least we are not what we used to be. Running the Map code that for each word, outputs a pair <word, line-number>, yielding the set of pairs...
<we, 1>
<are, 1> <not, 1> <what, 1> <we, 2> <want, 2> <to, 2> <be, 2> <but, 3> <at, 3> <least, 3> <we, 4> <are, 4> <not, 4> <what, 4> <we, 5> <used, 5> <to, 5> <be, 5> For now we can think of the <key, value> pairs as a nice linear list, but in reality, the Hadoop process runs in parallel on many machines. Each process has a little part of the overall Map input (called a map shard), and maintains its own local cache of the Map output. (For a description of how it really works, see HadoopMapReduce or the Google White Paper linked to at the end of this document.) After the Map phase produces the intermediate <key, value> pairs they are efficiently and automatically grouped by key by the Hadoop system in preparation for the Reduce phase (this grouping is known as the Shuffle phase of a map-reduce). For the above example, that means all the "we" pairs are grouped together, all the "are" pairs are grouped together like this, showing each group as a line...
<we, 1> <we, 2> <we, 4> <we, 5>
<are, 1> <are, 4> <not, 1> <not, 4> <what, 1> <what, 4> <want, 2> <to, 2> <to, 5> <be, 2> <be 5> <but, 3> <at, 3> <least, 3> <used, 5> The Reducer class contains a reduce function, which is then called once for each key -- one reduce call for "we", one for "are", and so on. Each reduce looks at all the values for that key and outputs a "summary" value for that key in the final output. So in the above example, the reduce is called once for the "we" key, and passed the values the mapper output, 1, 4, 2, and 5 (the values going into reduce are not in any particular order). Suppose reduce computes a summary value string made of the line numbers sorted into increasing order, then the output of the Reduce phase on the above pairs will produce the pairs shown below. The Reduce phase also sorts the output <key,value> pairs into increasing order by key:
<are, 1 4>
<at, 3> <be, 2 5> <but, 3> <least, 3> <not, 1 4> <to, 2 5> <we, 1 2 4 5> <what, 1 4> <want, 2> <used, 5> Like Map, Reduce is also run in parallel on a group of machines. Each machine is assigned a subset of the keys to work on (known as a reduce shard), and outputs its results into a separate file. By default these are named "part-#####". Line Indexer ExampleAs a first Hadoop code example we will look at a simple "line indexer". All of the example code here is available to be checked out, built, and run at (TODO:specify this path). Given an input text, offset indexer uses Hadoop to produce an index of all the words in the text. For each word, the index has a list of all the locations where the word appears and a text excerpt of each line where the word appears. Running the line indexer on the complete works of Shakespeare yields the following entry for the word "cipher".
38624 To cipher what is writ in learned books,
12046 To cipher me how fondly I did dote; 34739 Mine were the very cipher of a function, 16844 MOTH To prove you a cipher. 66001 ORLANDO Which I take to be either a fool or a cipher. The Hadoop code below for the line indexer is actually pretty short. The Map code extracts one word at a time from the input, and the Reduce code combines all the data for one word. Line Indexer MapA Java Mapper class is defined in terms of its input and intermediate <key, value> pairs. To declare one, simply subclass from MapReduceBase and implement the Mapper interface. The Mapper interface provides a single method: public void map(WriteableComparable key, Writeable value, OutputCollector output, Reporter reporter). Note: these inner classes probably need to be declared "static". If you get an error saying ClassName.<init>() is not defined, try declaring your class static. The map function takes four parameters which in this example correspond to:
The Hadoop system divides the (large) input data set into logical "records" and then calls map() once for each record. How much data constitutes a record depends on the input data type; For text files, a record is a single line of text. The main method is responsible for setting output key and value types. Since in this example we want to output <word, offset> pairs, the types will both be Text (a basic string wrapper, with UTF8 support). It is neccessary to wrap the more basic types because all input and output types for Hadoop must implement WritableComparable, which handles the writing and reading from disk. Line Indexer MapFor the line indexer problem, the map code takes in a line of text and for each word in the line outputs a string key/value pair <word, offset:line>. The Map code below accomplishes that by...
When run on many machines, each mapper gets part of the input -- so for example with 100 Gigabytes of data on 200 mappers, each mapper would get roughly its own 500 Megabytes of data to go through. On a single mapper, map() is called going through the data in its natural order, from start to finish. The Map phase outputs <key, value> pairs, but what data makes up the key and value is totally up to the Mapper code. In this case, the Mapper uses each word as a key, so the reduction below ends up with pairs grouped by word. We could instead have chosen to use the line-length as the key, in which case the data in the reduce phase would have been grouped by line length. In fact, the map() code is not required to call output.collect() at all. It may have its own logic to prune out data simply by omitting collect. Pruning things in the Mapper is efficient, since it is highly parallel, and already has the data in memory. By shrinking its output, we shrink the expense of organizing and moving the data in preparation for the Reduce phase. Line Indexer ReduceDefining a Reducer is just as easy. Simply subclass MapReduceBase and implement the Reducer interface: public void reduce(WriteableComparable key, Iterator values, OutputCollector output, Reporter reporter). The reduce() method is called once for each key; the values parameter contains all of the values for that key. The Reduce code looks at all the values and then outputs a single "summary" value. Given all the values for the key, the Reduce code typically iterates over all the values and either concats the values together in some way to make a large summary object, or combines and reduces the values in some way to yield a short summary value. The reduce() method produces its final value in the same manner as map() did, by calling output.collect(key, summary). In this way, the Reduce specifies the final output value for the (possibly new) key. It is important to note that when running over text files, the input key is the byte-offset within the file. If the key is propogated to the output, even for an identity map/reduce, the file will be filed with the offset values. Not only does this use up a lot of space, but successive operations on this file will have to eliminate them. For text files, make sure you don't output the key unless you need it (be careful with the IdentityMapper and IdentityReducer). Line Indexer Reduce CodeThe line indexer Reducer takes in all the <word, offset> key/value pairs output by the Mapper for a single word. For example, for the word "cipher", the pairs look like: <cipher, 38624:To cipher what is writ in learned books>, <cipher, 12046:To cipher me how fondly I did dote;>, <cipher, ... >. Given all those <key, value> pairs, the reduce outputs a single value string. For the line indexer problem, the strategy is simply to concat all the values together to make a single large string, using "^" to separate the values. The choice of "^" is arbitrary -- later code can split on the "^" to recover the separate values. So for the key "cipher" the output value string will look like "38624:To cipher what is writ in learned book^12046:To cipher me how fondly I did dote;^34739:Mine were the very cipher of a function,^ ...". To do this, the Reducer code simply iterates over values to get all the value strings, and concats them together into our output String.
Line Indexer Main ProgramGiven the Mapper and Reducer code, the short main() below starts the Map-Reduction running. The Hadoop system picks up a bunch of values from the command line on its own, and then the main() also specifies a few key parameters of the problem in the JobConf object, such as what Map and Reduce classes to use and the format of the input and output files. Other parameters, ie. the number of machines to use, are optional and the system will determine good values for them if not specified.
Running A Map-ReductionTo run a Hadoop job, simpy ssh into any of the JobTracker nodes on the cluster. To run the job, it is first necessary to copy the indput data files onto the distributed file system. If the data files are in the localInput/ directory, this is accomplished by executing:
The files will then be copied onto the dfs into the directory dfsInput. It is important to copy files into a well named directory that is unique. These files can be viewed with
where dir is the name of the directory to be viewed.
You can also use
to recursively view the directories. Note that all "relative" paths
given will be put in the /users/$USER/[dir] directory.
Make sure that the dfsOutput directory does not already exist, as you will be presented with an error, and your job will not run (This prevents the accidental overwriting of data, but can be overridden). Now that the data is available to all of the worker machines, the job can be executed from a local jar file:
The job should be run across the worker machines, copying input and intermediate data as needed. The output of the reduce stage will be left in the dfsOutput directory. To copy these files to your local machine in the directory localOutput, execute:
Running A Map-Reduction LocallyDuring testing, you may want to run your Map-Reduces locally so as not to adversely affect the compute clusters. This is easily accomplished by adding a line to the main method:
Seeing Job ProgressWhen you submit your job to run a line will be printed saying:
where 'job_12345' will correspond to whatever name your job has been given.
Further status information will be printed in that terminal as the job
progresses. However, it is also possible to monitor a job given its name from
any node in the cluster. This is done by the command:
A small amount of status information will be displayed, along with a link to a tracking URL (eg, http://jobtrackermachinename:50050/). This page will be a job-specific status page, and provide links to main status pages for other jobs and the Hadoop cluster itself. Line Indexer ClientAfter the Hadoop job finishes and the output is copied to a local machine, it can be analyzed. The line indexer client reads in the index files and presents a simple command line interface -- type a word to pull up that entry from the index, or leave the line blank to quit. To run the client, execute the program line_indexer.par:
The index file 'part-00000' will then be loaded and the following prompt
displayed:
*********************** Enter a word and the index listings generated by the map reduce will be printed:
*********************** After playing around with the client, enter 'exit' to shutdown the program. Ngram Mimic ExampleA funny and more complex example of a Map-Reduction is the Ngram Mimic algorithm. But first, a little terminology. A "bigram" is just a pair of adjacent words in a body of text, so the text "We hold these truths" contains the bigrams "We hold", "hold these", and "these truths". An "ngram" just generalizes the idea to more than two words, so the text contains the 3-grams "We hold these" and "hold these truths". The Ngram Mimic algorithm reads in a body of text and reduces it down to an index of what ngrams appear in the text. The ngram index can then drive the random generation of "mimic" paragraphs that bizarrely resemble the style and content of the original text. Here is an ngram mimic paragraph based on all the 3-grams in the complete works of Shakespeare...
Farewell! The entreaties of your kingdoms, my poor services, i'
the middle; on his shoulder, and his; her face o' fire with labour and the
feeders digest it with stamped coin, not stabbing steel; therefore they do
not call me rogue for being so horrible, so bloody, must lead on to some
place where chance may nurse or end it. Leontes [aside] too hot, too hot!
To me and so leaves me to the oracle: apollo be my friend, and comfort of
your brave father, whom, though bearing misery, i desire my life before
this ancient sir, who, it should seem, hath sometime loved! I am sorry,
most sorry, you have not redeem'd; indeed, paid down more penitence than
done trespass: at the queen's be't: 'good' should be hooted at like an old
sheep-whistling rogue a ram-tender, to offer to have had thee than it,--so
thou shalt hear that i, knowing by paulina that she is living, were it true
too, think you have done. Yet, if my lord! What a fool. Camillo's flight,
added to their own perdition. Polixenes for leontes: o thou tyrant! Do not
receive affliction at my request he would not so, you pity not the proof so
nigh.
The mimic paragraphs can appear to make sense if you don't look too closely. In reality, the algorithm is driven only by the statistics of how words tended to appear in the text, and it knows nothing about grammar or semantics (what the words actually mean). Ngram Mimic MapThe ngram mimic problem has two parts -- the reading of the original text to create the ngram index, and the generation of random paragraphs based on that index. This section outlines the creation of the ngram index, which is a perfect problem for the map-reduce programming model. The first goal is to figure a set of <prefix, follow-word> pairs, recording for each occurrence of a prefix in the text, what word followed it. For now, suppose that the prefix is a single word, so we are in effect recording all the bigrams in the text. The Mapper reads through the words in the text, and for each pair of adjacent words, outputs a <prefix-word, follow-word> pair. So with the text
sugar and spice and everything
nice and everything. we get the pairs...
<sugar, and>
<and, spice> <spice, and> <and, everything> <everything, nice> <nice, and> <and, everything> <everything, .> For the mimic algorithm, it works best to count the final period, ".", at the end of each sentence as a separate "word" of its own, as in the final <everything, .> pair. Also, the code will need to take care to get cross-line pairs like <everything, nice>, where part of the data is on one line and part is on the next line. To generalize the scheme to 3-grams or beyond, just allow the prefix to be made of multiple words. For 3-grams, for each three word sequence "word2 word1 word0" in the text, record that the prefix "word2 word1" was followed by "word0" by outputting the pair <word2 word1, word0>. The 3-grams for the above text are...
<sugar and, spice>
<and spice, and> <spice and, everything> <and everything, nice> ... Ngram Mimic Map CodeThe Hadoop system creates the Mapper object and then calls map() with line after line of the input text. To map out the ngram index, read through the words on each line from left to right. The StringTokenizer class provides a simple iterator interface for the parsing. For a sequence of three words "word2 word1 word0", output pairs <word1, word0> and <word2 word1, word0>, and then loop around to read the next word0. By declaring the word0/word1/word2 strings as instance variables, they persist across calls to map() so the words can be used from one line to the next. The Hadoop system calls map() to process the lines in their natural order (line-1, then line-2, then line-3).
To be fair, there is one flaw with the above Hadoop "word1 word0" approach to measuring bi-grams. We do not correctly measure the bi-grams that cross shards. So if one mapper gets a shard of data that ends "they lived happily", and some other mapper gets the next shard of data that begins with the data "everafter, the end." ... well since each mapper only sees its shard of the data, we won't count that "happily" precedes the word "everafter". But really, what's 10 pairs of words out of the collected works of Shakespeare? If one really wants to be a stickler, the input class (which does the sharding) can be rewritten to take this into account, but doing so would involve either some redundancy or a knowledge of the internal workings of map() to be successful. Ngram Mimic ReduceTo make the final mimic index, crunch the data down for each prefix to summarize all the "follow" words that might come after that prefix. In the example text above the prefix "and" has 3 follow words: the word "everything" follows 2 times, and the word "spice" follows 1 time. That information can be summarized in a single key/value pair <and^3^everything^2spice^1>. The prefix is the key and the value contains the total count first followed by the words and counts separated with "^". Each prefix is either a single word, like "He", or multiple words separated by spaces, like "He never". For example, here's an excerpt, from the ngram index built from Mark Twain's Huckleberry Finn:
He
never::1^seemed::1^wouldn't::1^can::1^lined::1^ain't::1^as::1^jumped::1^says::7^got::1^went::1^
oughter::1^cussed::1^was::12^know::1^took::1^told::4^hadn't::1^nearly::1^set::1^came::1^had::6^ said::9::1^never::2^might::1^slaps::1^looked::2^always::1^turns::1^dragged::1^wouldn't::1^dressed::1^ scolded::1^most::1^hasn't::1^come::1^just::1^left::1^says:::3^used::1^give::1^collared::1^often::1^ hadn't::1^has::1^run::1^had::3^stopped,::1^knowed::1^said::2^is::1^says,::1^never::1^hopped::1^ listened::1^kept::2^looked::2^studied::1^couldn't::1^said,::1^bounced::1^unlocked::1^abused::1^ ^went,::1^jumped::1^says:::8^got::3^lives::1^used::2^went::1^said:::1^leaned::1^drank::1^ flung::1^was::5^took::2^catched::1^told::2^set::1^had::6^dropped::1^didn't::1^said::26^chased::1^ ...
He had
suspicions::1^got::1^houses::1::1^the::1^one::1^a::2^heard::1^been::1^some::1^set::1^
an::2^got::1^his::1^forgot::1^his::2 The first block shows the counts for each word that followed the unigram "He". The second block shows the counts for each word that followed the bigram "He had". It can be seen that "never" followed "He" once, while "says" followed "He" 7 times, and that "suspicions" followed "He had" once and "a" followed "He had" twice. Ngram Mimic Reduce CodeTo crunch all the follow words to a single summary string, the reducer first puts all the follow words in a hash map to count how many times each appears. Then the final summary string, e.g. "the::3^it's::2^until::4^any::1" is a built from iterating over the keys in the hash map.
Run The MimicYou can run the mimic indexer in the same manner as the offset indexer
and then play with the client, which lets you generate random paragraphs that
read similarly to the original body. Just hit enter to let it seed itself, or
you can give it a phrase to start with.
********************** Other Resources
Copyright 2006 Google Inc. All Rights Reserved.
|