Wrote about mapreduce in search in a presentation for next week.
(more up-to-date pdf version of the presentation)
Best regards,
Amund
Atbrox
Wrote about mapreduce in search in a presentation for next week.
(more up-to-date pdf version of the presentation)
Best regards,
Amund
Atbrox
Our spin off startup company – Gravemaskinen AS – just launched a new media search engine in cooperation with Journalisten.no (magazine for Norwegian media professionals).
Some of the features include:
(it is deployed on Amazon EC2)
This posting gives an an example how F# and C# can scale potentially to up to thousands of machines with Mapreduce in order to efficiently process TeraByte (TB) and PetaByte (PB) data amounts. It shows a C# (c sharp) mapper function and a F# (f sharp) reducer function with a description on how to deploy the job on Amazon’s Elastic Mapreduce using bootstrap action (it was tested with an elastic mapreduce cluster of 10 machines).
The .net environment used is mono 2.8 and FSharp 2.0. The code described in this posting can be found on https://github.com/atbrox/atbroxexamples/
C# mapper
using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text.RegularExpressions; public class WordCountMapper { static public void Main () { String line; while( (line = Console.ReadLine()) != null) { line.Trim().Split(' ') .Where(s => !String.IsNullOrEmpty(s.Trim())) .ToList() .ForEach(term => Console.Write(term.ToLower() + "\t1\n")); } } }
Compiling c# code
mcs wordcountmapper.cs mkbundle -o wordcountmapper wordcountmapper.exe --deps --static
F# reducer
open System.Net open System open System.IO // note: F# has default immutable types (~ Erlang), // use explicit mutable keyword let sumReducer (key:string, values:seq<string[]>) : int = let mutable count = 0 for value in values do count <- count + Convert.ToInt32(value.[1]) mysum // note: |> below is similar to unix pipes let reduceInput (tr:TextReader) = Seq.initInfinite (fun _ -> tr.ReadLine()) |> Seq.takeWhile (fun line -> line <> null) |> Seq.map (fun line -> line.Split('\t','\n',' ')) |> Seq.groupBy (fun line -> line.[0].Trim()) let reduceRunner (tr:TextReader) (reducef:string*seq<string[]>->int) = for line in reduceInput tr do let key,values = line let wc = reducef(key,values) System.Console.WriteLine(key+"\t"+Convert.ToString(wc)) // run the sumReducer method on input data from stdin reduceRunner Console.In sumReducer
Compiling fsharp code
cd FSharp-2.0.0.0 mono bin/fsc.exe wordcountreducer.fsx --standalone mkbundle -o wordcountreducer wordcountreducer.exe --deps --static
In order to run mono/.net code on Elastic Mapreduce (Debian) Linux nodes you need to install mono on each node, this can be done with a bootstrap action shell script. In order to deploy the mapreduce job itself there are several options, the one shown in this posting is using the Boto API for python. Note: this requires that you fetch upload the mono-2.8.2-parallel-environment-amd64.deb to your s3 BUCKET in advance.
Bootstrap action shell script for installing mono
#!/bin/bash hadoop fs -copyToLocal s3://BUCKET/mono-2.8.2-parallel-environment-amd64.deb . sudo dpkg -i mono-2.8.2-parallel-environment-amd64.deb
Python script to deploy mapreduce and check status until it is done
import boto import boto.emr from boto.emr.step import StreamingStep from boto.emr.bootstrap_action import BootstrapAction import time # set your aws keys and S3 bucket, e.g. from environment or .boto AWSKEY= SECRETKEY= S3_BUCKET= NUM_INSTANCES = 1 SLAVE_INSTANCE_TYPE = 'm1.small' MASTER_INSTANCE_TYPE = 'm1.small' conn = boto.connect_emr(AWSKEY,SECRETKEY) bootstrap_step = BootstrapAction("installmono", "s3://" + S3_BUCKET + "/monobootstrap.sh",None) step = StreamingStep(name='Wordcount', mapper='s3n://' + S3_BUCKET + '/csharpmapper', reducer='s3n://' + S3_BUCKET + '/fsharpreducer', input='s3n://elasticmapreduce/samples/wordcount/input', output='s3n://' + S3_BUCKET + '/output') jobid = conn.run_jobflow( name="emr with fsharp and csharp", log_uri="s3://" + S3_BUCKET + "/logs", steps = [step], bootstrap_actions=[bootstrap_step], num_instances=NUM_INSTANCES, slave_instance_type=SLAVE_INSTANCE_TYPE, master_instance_type=MASTER_INSTANCE_TYPE, enable_debugging=True ) print "finished spawning job (note: starting still takes time)" state = conn.describe_jobflow(jobid).state print "job state = ", state print "job id = ", jobid while state != u'COMPLETED': print time.localtime() time.sleep(30) state = conn.describe_jobflow(jobid).state print "job state = ", state print "job id = ", jobid print "final output can be found in s3://" + S3_BUCKET + "/output" print "try: $ s3cmd sync s3://" + S3_BUCKET + "/output" + TIMESTAMP + " ."
In order to run the python deploy job you need to install boto, update constants in top of file for AWS keys and S3 bucket and finally start it with python deploy.py
Conclusion
This posting has given an example of a simple – wordcount in C# and F# with how to deploy the job on Amazon’s Elastic Mapreduce. It has been tested on up 10 m1.large ec2 nodes. Code can checked out with git clone git@github.com:atbrox/atbroxexamples.git
.
Best regards,
Amund Tveit (amund
Atbrox
If you are interested in Mapreduce (/Hadoop), I would like to recommend participating or submitting your paper to the 2nd International Workshop on Mapreduce and its applications (MAPREDUCE’11). The mapreduce workshop is held in correspondance with the The 20th International ACM Symposium on High-Performance Parallel and Distributed Computing in San Jose, California (June 2011).
(I just joined the mapreduce workshop as a program committee member)
Best regards,
A while back I wrote about How to combine Elastic Mapreduce/Hadoop with other Amazon Web Services. This posting is a small update to that, showing how to deploy extra packages with Boto for Python. Note that Boto can deploy mappers and reducers in written any language supported by Elastic Mapreduce. In the example below (it can also be found on github – http://github.com/atbrox/atbroxexamples, i.e. check out with git clone git@github.com:atbrox/atbroxexamples.git)
Imports and connection to elastic mapreduce on AWS
#!/usr/bin/env python import boto import boto.emr from boto.emr.step import StreamingStep from boto.emr.bootstrap_action import BootstrapAction import time # set your aws keys and S3 bucket, e.g. from environment or .boto AWSKEY= SECRETKEY= S3_BUCKET= NUM_INSTANCES = 1 conn = boto.connect_emr(AWSKEY,SECRETKEY)
Bootstrap step being created
In this case a shell script from s3, note that this could contain sudo commands in order to do apt-get installs, e.g to install classic programming language packages like gfortran or open-cobol, or more modern languages like ghc6 (haskell), or any code, e.g. checking out latest version of a programming language (e.g. Google Go with hg clone -r release https://go.googlecode.com/hg/ $GOROOT) interpreter/compiler and compile it before using it in your mappers or reducers
bootstrap_step = BootstrapAction("download.tst", "s3://elasticmapreduce/bootstrap-actions/download.sh",None)
Create map and reduce processing step
Using cache_files also adds a python library available for import (the other way could be to do sudo easy_install boto in the bootstrap step, which would be easier since the boto module wouldn’t have to be unpacked manually in the python code, see my previous posting for details about unpacking). Note that the mapper and reducer could be any language as long as you either have compiled in or have installed an interpreter for it with the bootstrap step.
step = StreamingStep( name='Wordcount', mapper='s3n://elasticmapreduce/samples/wordcount/wordSplitter.py', cache_files = ["s3n://" + S3_BUCKET + "/boto.mod#boto.mod"], reducer='aggregate', input='s3n://elasticmapreduce/samples/wordcount/input', output='s3n://' + S3_BUCKET + '/output/wordcount_output') jobid = conn.run_jobflow( name="testbootstrap", log_uri="s3://" + S3_BUCKET + "/logs", steps = [step], bootstrap_actions=[bootstrap_step], num_instances=NUM_INSTANCES)
Wait for job to start
This waits for the Elastic Mapreduce Job to start and prints out status, one of the statuses between starting and running being bootstrapping.
state = conn.describe_jobflow(jobid).state print "job state = ", state print "job id = ", jobid while state != u'COMPLETED': print time.localtime() time.sleep(30) state = conn.describe_jobflow(jobid).state print "job state = ", state print "job id = ", jobid print "final output can be found in s3://" + S3_BUCKET + "/output" + TIMESTAMP print "try: $ s3cmd sync s3://" + S3_BUCKET + "/output" + TIMESTAMP + " ."
Validation of what really happened
One way to validate is to check that your mappers and reducers written in any language (i.e. for which compiler that you installed with bootstrap action), e.g. the classic mapreduce word count written in classic languages like Cobol or Fortran 95? The other way is to check the s3 logs, the log directory for an elastic mapreduce job has the following subdirectories:
daemons jobs node steps task-attempts
In the node directory, each EC2 instance used in the job has a directory, and underneath each of them there is a bootstrap_actions directory with the master.log and stderr, stdout and controller logs. In the case presented above bootstrap output is shown underneath.
stderr output
--2010-10-01 17:38:38-- http://elasticmapreduce.s3.amazonaws.com/bootstrap-actions/file.tar.gz Resolving elasticmapreduce.s3.amazonaws.com... 72.21.214.39 Connecting to elasticmapreduce.s3.amazonaws.com|72.21.214.39|:80... connected. HTTP request sent, awaiting response... HTTP/1.1 200 OK x-amz-id-2: NezTUU9MIzPwo72lJWPYIMo2wwlbDGi1IpDbV/mO07Nca4VarSV8l7j/2ArmclCB x-amz-request-id: 3E71CC3323EC1189 Date: Fri, 01 Oct 2010 17:38:39 GMT Last-Modified: Thu, 03 Jun 2010 01:57:13 GMT ETag: "47a007dae0ff192c166764259246388c" Content-Type: application/octet-stream Content-Length: 153 Connection: keep-alive Server: AmazonS3 Length: 153 [application/octet-stream] Saving to: `file.tar.gz' 0K 100% 24.3M=0s 2010-10-01 17:38:38 (24.3 MB/s) - `file.tar.gz' saved [153/153]
Controller
2010-10-01T17:38:35.141Z INFO Fetching file 's3://elasticmapreduce/bootstrap-actions/download.sh' 2010-10-01T17:38:38.411Z INFO Working dir /mnt/var/lib/bootstrap-actions/1 2010-10-01T17:38:38.411Z INFO Executing /mnt/var/lib/bootstrap-actions/1/download.sh 2010-10-01T17:38:38.936Z INFO Execution ended with ret val 0 2010-10-01T17:38:38.938Z INFO Execution succeeded
Conclusion
The posting has shown how to programmatically install packages (e.g. programming languages) to EC2 nodes running elastic mapreduce. Since elastic mapreduce in streaming mode supports any programming language this can make it easier to deploy and test mappers and reducers written in your favorite language, and even automate it. (Opens a few doors for parallelization of legacy code)
Best regards,
Amund Tveit, co-founder of Atbrox