Saturday, October 03, 2009

Parallelizing Crawling with Hadoop-Streaming

Recently, I needed to get search result counts off our search service for about 1.5 million terms. The first cut of my code simply read a flat file of these terms (one per line), hit the search service, parsed out the content of the opensearch:totalResults tag out of the RSS result, and wrote the original term and the count into an output file. Here is the code:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#! /usr/local/bin/python
# Source: result_counter.py
import urllib
import feedparser
import sys

urlTemplate = "http://search.mycompany.com/search?q=${term}"

def main():
  if (len(sys.argv) != 3):
    print("Usage: %s input output" % (sys.argv[0]))
    sys.exit(-1)
  infile = open(sys.argv[1], 'r')
  outfile = open(sys.argv[2], 'w')
  lno = 0
  while (True):
    line = infile.readline()[:-1]
    lno = lno + 1
    if (line == ''):
      break
    url = urlTemplate.replace("${term}", urllib.quote_plus(line))
    feed = feedparser.parse(url)
    # sometimes feedparser is not able to get the namespace for the page, 
    # so it uses the default (none) namespace in the key, so we need to
    # check for both cases.
    if (feed.channel.has_key('opensearch_totalresults')):
      count = int(feed.channel.opensearch_totalresults)
    elif (feed.channel.has_key('totalresults')):
      count = int(feed.channel.totalresults)
    else:
      count = -1
    outfile.write("|".join([line, str(count)]) + "\n")
  outfile.close()
  infile.close()

if __name__ == "__main__":
  main()

In order to get the job done in a reasonable time, I split the input file into 5 parts (using Unix split), then ran 5 copies of the job against each of the splits. When the job completed, I just concatenated the files back (using Unix cat) to produce the output.

While I was waiting for the job to complete, I realized that I could have used Hadoop streaming to do the same thing. Even though its probably not a textbook application of the Map-Reduce framework, what I was doing by hand is pretty much what Hadoop does automatically. In addition, it provides file reading/writing services and a web-based console to track progress. Using Michael Noll's excellent hadoop-streaming tutorial, I converted the code above to a Map-Reduce pair, which are shown below:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
#!/usr/local/bin/python
# Source: my_result_counter_mapper.py:

import feedparser
import sys
import urllib

urlTemplate = "http://search.mycompany.com/search?q=${term}"

for line in sys.stdin:
  line = line.strip()
  url = urlTemplate.replace("${term}", urllib.quote_plus(line))
  feed = feedparser.parse(url)
  if (feed.channel.has_key('opensearch_totalresults')):
    count = int(feed.channel.opensearch_totalresults)
  elif (feed.channel.has_key('totalresults')):
    count = int(feed.channel.totalresults)
  else:
    count = -1
  print "|".join([line, str(count)]

The reducer in this case is a simple Identity Reducer, it writes out a single record for every output of the Mapper.

1
2
3
4
5
6
#!/usr/local/bin/python
# Source: my_result_counter_reducer.py
import sys

for line in sys.stdin:
  print line

To test this on the command line (outside of Hadoop), we simply use a command like the following:

1
2
3
4
[sujit@sirocco ~] cat input.txt | \
    /path/to/my_result_counter_mapper.py | \
    sort | \
    /path/to/my_result_counter_reducer.py

To run this inside Hadoop (using Hadoop Streaming), we create our input directory inside a HDFS filesystem and copy our input file into it. Then we run the "bin/hadoop jar..." command to run the Map-Reduce job. The output can be found in the HDFS in the /myjob-output directory, from which we copy it back to the local filesystem. The sequence of commands to do this are as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
[sujit@sirocco ~] cd $HADOOP_HOME
[sujit@sirocco hadoop] bin/hadoop fs -mkdir /myjob
[sujit@sirocco hadoop] bin/hadoop fs -put /local/path/input.txt /myjob
[sujit@sirocco hadoop] bin/hadoop jar contrib/streaming/hadoop-0.20.1-streaming.jar \
    -mapper /path/to/my_result_counter_mapper.py \
    -reducer /path/to/my_result_counter_reducer.py \
    -input /myjob -output /myjob-output  \
    -numReduceTasks 5
...
[sujit@sirocco hadoop] bin/hadoop fs -get /myjob-output/* /path/to/local/output/dir

When I ran this (in pseudo distributed mode) on my local box, the Hadoop streaming version took approximately twice as long as the manually distributed version. This is because Hadoop does not allow the user to set the number of map tasks. I did try setting the number of map tasks to 10 in mapred-site.xml, but that did not seem to have any effect - Hadoop just allocated 2 map tasks for the job. I also tried setting the mapred.min.split.size and mapred.max.split.size to different values in an attempt to influence Hadoop to allocate a larger number of map tasks to the job, but that did not help either.

As you can see, my code is pretty trivial, and I could easily run 10 or more map tasks without any problem. In my manual example, the number of mappers and the number of reducers is effectively 5 each, which is probably why it takes less time (factoring in some management overhead for Hadoop).

Apparently, one way (and probably the recommended way, based on this HADOOP-960 discussion) to influence Hadoop to allocate a desired number of map tasks is to write your own InputFormat implementation, and implement the getSplits() method, then pass in the class via the -inputFormat parameter to hadoop-streaming. I plan on doing this, because a lot of my work involves running these one-off things that would go much faster if distributed, and because hadoop-streaming offers a much nicer alternative than the manual method.

So time to go read (and write) some Java code, I guess. I will report back if I get this thing to work.

Update 2009-10-06: A combination of Googling and poking through the Hadoop source provided me with the solution to the problem. As this Hadoop Wiki Page states, you can increase the number of maps from what Hadoop decides is the right number but not decrease it (you can do this too, by controlling the number of splits via a custom InputFormat). Since I was working in pseudo-distributed mode (i.e. cluster of 1 machine), I had to set the number of map tasks per node up to 10 (also upped the number of reducers while I was at it), and set the number of map tasks to 10 in my mapred-site.xml file. The relevant portion of the file is shown below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
<configuration>
  ...
  <!-- allow 10 mappers for hadoop streaming -->
  <property>
    <name>mapred.tasktracker.map.tasks.maximum</name>
    <value>10</value>
  </property>
  <property>
    <name>mapred.tasktracker.reduce.tasks.maximum</name>
    <value>10</value>
  </property>
  <property>
    <name>mapred.map.tasks</name>
    <value>10</value>
  </property>
  <!-- /allow 10 mappers for hadoop streaming -->
</configuration>

The custom InputFormat that I thought I had to build earlier was not needed.

2 comments (moderated to prevent spam):

Matrix said...

Hi,
Very interesting posts and I enjoy reading it. One question though, what was the timing difference when you upped the max value for the Map jobs. Did it run well under the time your manual process took?

Sujit Pal said...

Hi Matrix, thanks, and yes, my hadoop job with 10 map jobs ran in slightly more than half the time it took for my manual setup of 5 manual jobs. My manual job took approximately 10 hours to complete, and the Hadoop job took about 5 hrs 20 mins.