Friday, 18 September 2009

Trying out mapreduce - on the farm

Received an email this week from Sanger helpdesk that they installed a test hadoop system on the farm with 2 nodes. Thanks guys! First thing to do, obviously, was to repeat the streaming mapreduce exercise I did on my own machine (see my previous post). Only difference with my local setup is that this time I had to handle HDFS.

As a recap from my previous post: I'll be running an equivalent of the following:
cat snps.txt | ruby snp_mapper.rb | sort | ruby snp_reducer.rb

Setting up
First thing the Sanger wiki told me was to format my HDFS space:
hadoop namenode -format
This apparently only affects my own space... After that I could start playing with hadoop.

Where am I?
It looks like hadoop installs its own complete filesystem: even if my path on the server would be /home/users/a/aerts, a
hadoop fs -lsr /
shows that in the HDFS system I'm at /user/aerts.

Preparing the run
First off: create a directory to work in. Let's call that "locustree" with two subdirectories, called "input" and "output".
hadoop fs -mkdir locustree
hadoop fs -mkdir locustree/input
hadoop fs -mkdir locustree/output
And copy your datafile to the input directory:
hadoop fs -put snps.txt locustree/input/
Running the run
Hadoop documentation mentions that any shebang line in the mapper and reducer scripts are likely to not work, so you have to call ruby explicitely. Provide both scripts as "-file" arguments to hadoop. Finally, from your local directory containing the scripts, run the following command to run the mapreduce job:
hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-0.20.1-streaming.jar \
-input locustree/input/snps.txt \
-mapper "/usr/local/bin/ruby snp_mapper.rb" \
-reducer "/usr/local/bin/ruby snp_reducer.rb" \
-output locustree/output/snp_index \
-file snp_mapper.rb \
-file snp_reducer.rb

Et voila: a new directory snp_index now contains the file spit out by the snp_reducer.rb script.

Wednesday, 2 September 2009

Trying out mapreduce

learning to map/reduce
Photo by niv available from Flickr

I have long been interested in trying out mapreduce in my data pipelines. The Wellcome Trust Sanger Institute has several huge compute farms that I normally use, but they don't support mapreduce jobs. Quite understandable from the IT's and institute's point of view because it's a mammoth task to keep those things running. But it also means that I can't put a foot on the mapreduce path.

There are options to run mapreduce on your own, however. One is to install hadoop locally with the restriction that you can only use one node: your own machine. This walkthrough will get you started with that. Another approach is to use Amazon Elastic MapReduce (AWS EMR).

First of, I must admit that I'm far from familiar with mapreduce at this point in time. As far as I understand it's ideal for aggregating data when the algorithm can be run on a subset of the data as well as the complete dataset. The mapping step of the framework will create different subsets of that data, run the required program on it and return partial results. These results should be in the form of key/value-pairs and will serve as the input for a reduce step that combines all intermediate results into one.

What's held me back from using AWS EMR is the cost. It's practically nothing, but anything I run on their servers at this moment is paid for with my own money. So today I installed hadoop locally and started coding.

The pilot
One of the projects I'm working on is LocusTree: a way of indexing genomic features that allows for quick visualization at different resolutions. In this data structure the genome is first divided into bins of 4,000 bp (called level 0). The next step sees the creation of new "parent" bins that each hold 2 of the original ones (so each covering 8,000 bp; called level 1). You keep on grouping 2 bins into a parent bin until you end up with a single bin for a complete chromosome. For chromosome 1 that means 17 levels, I think.

Each of the features - for example a list of 15 million SNPs or log2ratios for 42 million probes - is put in the smallest node in which it fits entirely. A feature ranging from position 10 to 20 will fit in the first bin on level 0; but a feature from position 3,990 to 4,050 will have to go in the first bin on level 1. Subsequently a count and if applicable some aggregate value like sum, min or max is stored in that node as well as in every parent node.


So for each SNP/probe the indexing script has to find the smallest enclosing node and update the aggregate values for each parent up to level 3.

The setup
I'm not good at java so used the hadoop streaming approach for this indexing. The input file contains about 15 million SNPs and looks like this:
snp6216929   3   177718011   177718021
snp6216930 3 177718267 177718277
snp6216931 3 177718268 177718278
snp6216932 3 177718294 177718304
snp6216933 3 177718612 177718622
snp6216934 3 177718629 177718639
snp6216935 3 177718956 177718966
snp6216936 3 177719529 177719539
snp6216937 3 177719623 177719633
I want to get a list of nodes with the count of SNPs within that node and all child-nodes. Each node should also list the SNP names for which that node is the smallest enclosing node. So the output looks like:
3:0:44429   9   snp6216929,snp6216930,snp6216931,snp6216932,snp6216933,snp6216934,snp6216935,snp6216936,snp6216937
3:0:44430 1 snp6216938
3:1:22214 9
3:1:22215 1
3:2:11107 10
3:3:5553 10
3:4:2776 10
3:5:1388 10
3:6:694 10
3:7:347 10
3:8:173 10
3:9:86 10
3:10:43 10
3:11:21 10
3:12:10 10
3:13:5 10
3:14:2 10
3:15:1 10
3:16:0 10
3:17:0 10
First column is the node (chromosome - level number - start position), second column is the number of SNPs covered by that node, and third column lists the SNP names for the smallest enclosing nodes.

The scripts
Code for the mapper:
#!/usr/bin/ruby
CHROMOSOME_LENGTHS = {'1' => 247249719,
'2' => 242951149,
'3' => 199501827,
'4' => 191273063,
'5' => 180857866,
'6' => 170899992,
'7' => 158821424,
'8' => 146274826,
'9' => 140273252,
'10' => 135374737,
'11' => 134452384,
'12' => 132349534,
'13' => 114142980,
'14' => 106368585,
'15' => 100338915,
'16' => 88827254,
'17' => 78774742,
'18' => 76117153,
'19' => 63811651,
'20' => 62435964,
'21' => 46944323,
'22' => 49691432,
'23' => 154913754,
'24' => 57772954
}

def enclosing_node(chr,start,stop)
start = start.to_i
stop = stop.to_i
level_number = ((Math.log(CHROMOSOME_LENGTHS[chr]) - Math.log(4000)).to_f/Math.log(2)).floor + 1
resolution_at_level = 4000*(2**level_number)
previous_start_bin = 0
previous_level_number = level_number
start_bin = (start-1).div(resolution_at_level)
stop_bin = (stop-1).div(resolution_at_level)

ancestors = Array.new
while start_bin == stop_bin and level_number >= 0
ancestors.push([chr, level_number + 1, previous_start_bin].join(":"))
previous_start_bin = start_bin
previous_level_number = level_number
level_number -= 1
resolution_at_level = 4000*(2**level_number)
start_bin = (start-1).div(resolution_at_level)
stop_bin = (stop-1).div(resolution_at_level)
end
return [[chr, level_number + 1, previous_start_bin].join(":"), ancestors.reverse].flatten
end

STDIN.each do |line|
name, chr, start, stop = line.chomp.split(/\t/)

encl_node, *ancestors = enclosing_node(chr,start,stop)
STDOUT.puts [encl_node, 1, name].join("\t")
ancestors.each do |node|
STDOUT.puts [node, 1].join("\t")
end
end
Don't worry about the details. It basically takes the input from standard input and writes this to standard output:
3:0:44429   1   snp6216929
3:1:22214 1
3:2:11107 1
3:3:5553 1
3:4:2776 1
3:5:1388 1
3:6:694 1
3:7:347 1
3:8:173 1
3:9:86 1
3:10:43 1
3:11:21 1
3:12:10 1
3:13:5 1
3:14:2 1
3:15:1 1
3:16:0 1
3:17:0 1
3:0:44429 1 snp6216930
3:1:22214 1
3:2:11107 1
3:3:5553 1
...
The reducer script takes all this and summarizes it all:
#!/usr/bin/ruby
nodes = Hash.new
STDIN.each do |line|
node, count, ids = line.chomp.split(/\t/)
parsed_ids = ids.split(/,/) unless ids.nil?
if nodes.has_key?(node)
nodes[node][:count] += count.to_i
nodes[node][:accessions].push(parsed_ids) unless ids.nil?
else
nodes[node] = Hash.new
nodes[node][:count] = count.to_i
nodes[node][:accessions] = Array.new
nodes[node][:accessions].push(parsed_ids) unless ids.nil?
end
end
nodes.keys.each do |locus|
STDOUT.puts [locus, nodes[locus][:count], nodes[locus][:accessions].flatten.join(',')].join("\t")
end
Before running this under hadoop it's usefull to just manually pipe these together on the unix command line:
cat snps.txt | ruby snp_mapper.rb | ruby snp_reducer.rb | sort

Because this was the output I expected I could run the pipeline with hadoop:
$HADOOP_HOME/bin/hadoop
jar $HADOOP_HOME/contrib/streaming/hadoop-0.18.3-streaming.jar
-input snps.txt
-mapper snp_mapper.rb
-reducer snp_reducer.rb
-output ./snp_index/
-file snp_mapper.rb
-file snp_reducer.rb


...and lo-and-behold: the snp_index directory contains the result I want.

Parting thoughts
This is the very first time I use mapreduce so the code is far from optimized. Note that the code above is not a reflection of my programming skills :-) I just wanted to focus on the hadoop pipeline rather than the code. Actually, running this thing on my own laptop lasted almost 4 hours and nearly brought it to its knees. It certainly beat it up badly: that hash in the reducer script is just way too big... This little mapreduce experiment is finished, but I might have another go later and optimize the mapper and reducer scripts (getting rid of that huge hash), or create some intermediate reducer step.

Unfortunately it's only using one compute node (my own laptop) so I couldn't check how fast it can be. Running a small subset on Amazon Elastic MapReduce shows that that should work as well.

I only kept track of the count as an aggregate in the nodes. In case I wanted to calculate sum, min and max, I'd probably use a json-formatted string in the key/value pairs:
{"count": 4, "sum": 17, "min": 2, "max": 12}


I will definitely use mapreduce in the future, given the premise that my work would either install it on a cluster locally or we get an institute account on AWS.