For those who like to write map-reduce programs in python, there are good toolkit available out there like mrjob and dumbo.
Internally, they still use Hadoop streaming to submit map-reduce jobs. These tools simplify the process of map-reduce job submission. My own experience with mrjob has been good so far. Installing and using mrjob is easy.
First ensure that you have installed a higher version of python than the default that comes with Linux (2.4.x for supporting yum). Ensure that you don't replace the existing python distribution as it breaks "yum".
Install mrjob on one of the machine in your Hadoop cluster. It is nicer to use virtualenv for creating isolated environment.
wget -O virtualenv.py http://bit.ly/virtualenv /usr/bin/python26 virtualenv.py pythonenv hadoopenv/bin/easy_install pip hadoopenv/bin/pip install mrjob
The current version available to me is "mrjob==0.3.3.2".
There is a small ugly hack that you need to make in one of the file: pythonenv/lib/python2.6/site-packages/mrjob/hadoop.py at line number 444.
I am not sure if I am doing something wrong but it throws an exception that "self._start_step_num" is None.
Replace with the following lines.
# look for a Python trace-back cause = None if self._start_step_num and step_num: cause = self._find_probable_cause_of_failure( [step_num + self._start_step_num])
You also need to set the HADOOP_HOME variable.
Thats it and you should be ready to use mrjob!
Writing map-reduce program
Now we can run through the familiar word-count example.
from mrjob.job import MRJob import re WORD_RE = re.compile(r"[\w']+") class MRWordFreqCount(MRJob): def mapper(self, _, line): for word in WORD_RE.findall(line): yield (word.lower(), 1) def combiner(self, word, counts): yield (word, sum(counts)) def reducer(self, word, counts): yield (word, sum(counts)) if __name__ == '__main__': MRWordFreqCount.run()It is as simple as that. You need to create a class derived from MRjob and provide essential methods like mapper, combiner and reducer.
Depending on what you want to do, you may need only a mapper or mapper and reducer or all of them.
Running map-reduce with mrjob
To run this program, you need to issue the following command
pythonenv/bin/python wordcount.py hdfs:///path/to/file/inhdfs -r hadoop --python-bin python26 --step-num=1 hdfs:///path/to/file/inhdfs => input dir or file in hdfs -r hadoop => tells mrjob to run the job on hadoop cluster --python-bin python26 => use newer version of python executable --step-num=1 => tells the step to execute
You should be able to successfully run the map-reduce using mrjob.
Input to a mapper is a line and it's output is a (key, value) pair . In this case, it's output is (keyword, 1) pair.
Reducer takes key value pair and reduces it. In the above program, it outputs (keyword, occurrences) pairs.
Streaming final output from hdfs:///somepath/tmp/mrjob/test.admin.20120506.133838.502705/output "a" 2 "about" 1 "adapting" 1 "again" 2 ......
You can also provide multiple input by specifying them directly during invocation of mrjob command
hdfs:///path/to/file/inhdfs1 hdfs:///path/to/file/inhdfs2 hdfs:///path/to/file/inhdfs3You can store the output into hdfs or local path with another option to the mrjob command.
You have to ensure that the parent directory exists in hdfs and output directory does not exist in HDFS, or else it will error out.
hadoop fs -mkdir hdfs:///pathto/wordcount hadoop fs -rmdir hdfs:///pathto/wordcount/output/2345
Here there is always some output from the reducer phase if the file is non-empty.
In certain map-reduce programs like grep/matching regular expressions, it may not always yield an output. Hadoop map-reduce considers this as a failure.
In order to avoid the issue, you will have to pass the following option to your map-reduce program.
There are few more options that allows you to write elaborate map-reduce programs using mrjob. Check out the documentation for the details.