Feb 07

This posting gives an an example how F# and C# can scale potentially to up to thousands of machines with Mapreduce in order to efficiently process TeraByte (TB) and PetaByte (PB) data amounts. It shows a C# (c sharp) mapper function and a F# (f sharp) reducer function with a description on how to deploy the job on Amazon’s Elastic Mapreduce using bootstrap action (it was tested with an elastic mapreduce cluster of 10 machines).

The .net environment used is mono 2.8 and FSharp 2.0. The code described in this posting can be found on https://github.com/atbrox/atbroxexamples/

Mapreduce Code

C# mapper

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text.RegularExpressions;
 
public class WordCountMapper {
  static public void Main () {
    String line;
    while( (line = Console.ReadLine()) != null) {
      line.Trim().Split(' ')
       .Where(s => !String.IsNullOrEmpty(s.Trim()))
       .ToList()
       .ForEach(term => Console.Write(term.ToLower() + "\t1\n"));
    }
  }
}

Compiling c# code

mcs wordcountmapper.cs
mkbundle -o wordcountmapper wordcountmapper.exe --deps --static

F# reducer

open System.Net
open System
open System.IO

// note: F# has default immutable types (~ Erlang), 
// use explicit mutable keyword
let sumReducer (key:string, values:seq<string[]>) : int =
    let mutable count = 0 
    for value in values do
        count <- count  + Convert.ToInt32(value.[1])
    mysum

// note: |> below is similar to unix pipes 
let reduceInput (tr:TextReader) =
    Seq.initInfinite (fun _ -> tr.ReadLine())
    |> Seq.takeWhile (fun line -> line <> null) 
    |> Seq.map (fun line -> line.Split('\t','\n',' '))
    |> Seq.groupBy (fun line -> line.[0].Trim())

let reduceRunner (tr:TextReader) (reducef:string*seq<string[]>->int) = 
    for line in reduceInput tr do
        let key,values = line
        let wc = reducef(key,values)
        System.Console.WriteLine(key+"\t"+Convert.ToString(wc))

// run the sumReducer method on input data from stdin
reduceRunner Console.In sumReducer

Compiling fsharp code

cd FSharp-2.0.0.0
mono bin/fsc.exe wordcountreducer.fsx --standalone 
mkbundle -o wordcountreducer wordcountreducer.exe --deps --static

Deployment on Amazon’s Elastic Mapreduce

In order to run mono/.net code on Elastic Mapreduce (Debian) Linux nodes you need to install mono on each node, this can be done with a bootstrap action shell script. In order to deploy the mapreduce job itself there are several options, the one shown in this posting is using the Boto API for python. Note: this requires that you fetch upload the mono-2.8.2-parallel-environment-amd64.deb to your s3 BUCKET in advance.

Bootstrap action shell script for installing mono

#!/bin/bash
hadoop fs -copyToLocal s3://BUCKET/mono-2.8.2-parallel-environment-amd64.deb .
sudo dpkg -i mono-2.8.2-parallel-environment-amd64.deb 

Python script to deploy mapreduce and check status until it is done

import boto
import boto.emr
from boto.emr.step import StreamingStep
from boto.emr.bootstrap_action import BootstrapAction
import time

# set your aws keys and S3 bucket, e.g. from environment or .boto
AWSKEY=
SECRETKEY=
S3_BUCKET=
NUM_INSTANCES = 1
SLAVE_INSTANCE_TYPE = 'm1.small'
MASTER_INSTANCE_TYPE = 'm1.small'

conn = boto.connect_emr(AWSKEY,SECRETKEY)

bootstrap_step = BootstrapAction("installmono", "s3://" + S3_BUCKET + "/monobootstrap.sh",None)

step = StreamingStep(name='Wordcount',
                     mapper='s3n://' + S3_BUCKET + '/csharpmapper',
                     reducer='s3n://' + S3_BUCKET + '/fsharpreducer',
                     input='s3n://elasticmapreduce/samples/wordcount/input',
                     output='s3n://' + S3_BUCKET + '/output')

jobid = conn.run_jobflow(
    name="emr with fsharp and csharp", 
    log_uri="s3://" + S3_BUCKET + "/logs", 
    steps = [step],
    bootstrap_actions=[bootstrap_step],
    num_instances=NUM_INSTANCES,
    slave_instance_type=SLAVE_INSTANCE_TYPE,
    master_instance_type=MASTER_INSTANCE_TYPE,
    enable_debugging=True
)

print "finished spawning job (note: starting still takes time)"

state = conn.describe_jobflow(jobid).state
print "job state = ", state
print "job id = ", jobid
while state != u'COMPLETED':
    print time.localtime()
    time.sleep(30)
    state = conn.describe_jobflow(jobid).state
    print "job state = ", state
    print "job id = ", jobid

print "final output can be found in s3://" + S3_BUCKET + "/output" 
print "try: $ s3cmd sync s3://" + S3_BUCKET + "/output" + TIMESTAMP + " ."

In order to run the python deploy job you need to install boto, update constants in top of file for AWS keys and S3 bucket and finally start it with python deploy.py

Conclusion
This posting has given an example of a simple – wordcount in C# and F# with how to deploy the job on Amazon’s Elastic Mapreduce. It has been tested on up 10 m1.large ec2 nodes. Code can checked out with git clone git@github.com:atbrox/atbroxexamples.git.

Atbrox on LinkedIn

Best regards,
Amund Tveit (amund atbrox.com)
Atbrox

Digg This
Reddit This
Stumble Now!
Buzz This
Vote on DZone
Share on Facebook
Bookmark this on Delicious
Kick It on DotNetKicks.com
Shout it
Share on LinkedIn
Bookmark this on Technorati
Post on Twitter
Google Buzz (aka. Google Reader)
Tagged with:
Oct 01

A while back I wrote about How to combine Elastic Mapreduce/Hadoop with other Amazon Web Services. This posting is a small update to that, showing how to deploy extra packages with Boto for Python. Note that Boto can deploy mappers and reducers in written any language supported by Elastic Mapreduce. In the example below (it can also be found on github – http://github.com/atbrox/atbroxexamples, i.e. check out with git clone git@github.com:atbrox/atbroxexamples.git)

Imports and connection to elastic mapreduce on AWS

 
#!/usr/bin/env python
import boto
import boto.emr
from boto.emr.step import StreamingStep
from boto.emr.bootstrap_action import BootstrapAction
import time

# set your aws keys and S3 bucket, e.g. from environment or .boto
AWSKEY= 
SECRETKEY= 
S3_BUCKET=
NUM_INSTANCES = 1

conn = boto.connect_emr(AWSKEY,SECRETKEY)

Bootstrap step being created
In this case a shell script from s3, note that this could contain sudo commands in order to do apt-get installs, e.g to install classic programming language packages like gfortran or open-cobol, or more modern languages like ghc6 (haskell), or any code, e.g. checking out latest version of a programming language (e.g. Google Go with hg clone -r release https://go.googlecode.com/hg/ $GOROOT) interpreter/compiler and compile it before using it in your mappers or reducers

bootstrap_step = BootstrapAction("download.tst", "s3://elasticmapreduce/bootstrap-actions/download.sh",None)

Create map and reduce processing step
Using cache_files also adds a python library available for import (the other way could be to do sudo easy_install boto in the bootstrap step, which would be easier since the boto module wouldn’t have to be unpacked manually in the python code, see my previous posting for details about unpacking). Note that the mapper and reducer could be any language as long as you either have compiled in or have installed an interpreter for it with the bootstrap step.

step = StreamingStep(
  name='Wordcount',
  mapper='s3n://elasticmapreduce/samples/wordcount/wordSplitter.py',
  cache_files = ["s3n://" + S3_BUCKET + "/boto.mod#boto.mod"],
  reducer='aggregate',
  input='s3n://elasticmapreduce/samples/wordcount/input',
  output='s3n://' + S3_BUCKET + '/output/wordcount_output')

jobid = conn.run_jobflow(
    name="testbootstrap", 
    log_uri="s3://" + S3_BUCKET + "/logs", 
    steps = [step],
    bootstrap_actions=[bootstrap_step],
    num_instances=NUM_INSTANCES)

Wait for job to start
This waits for the Elastic Mapreduce Job to start and prints out status, one of the statuses between starting and running being bootstrapping.

state = conn.describe_jobflow(jobid).state
print "job state = ", state
print "job id = ", jobid
while state != u'COMPLETED':
    print time.localtime()
    time.sleep(30)
    state = conn.describe_jobflow(jobid).state
    print "job state = ", state
    print "job id = ", jobid

print "final output can be found in s3://" + S3_BUCKET + "/output" + TIMESTAMP
print "try: $ s3cmd sync s3://" + S3_BUCKET + "/output" + TIMESTAMP + " ."

Validation of what really happened
One way to validate is to check that your mappers and reducers written in any language (i.e. for which compiler that you installed with bootstrap action), e.g. the classic mapreduce word count written in classic languages like Cobol or Fortran 95? The other way is to check the s3 logs, the log directory for an elastic mapreduce job has the following subdirectories:

daemons  jobs  node  steps  task-attempts

In the node directory, each EC2 instance used in the job has a directory, and underneath each of them there is a bootstrap_actions directory with the master.log and stderr, stdout and controller logs. In the case presented above bootstrap output is shown underneath.
stderr output

--2010-10-01 17:38:38--  http://elasticmapreduce.s3.amazonaws.com/bootstrap-actions/file.tar.gz
Resolving elasticmapreduce.s3.amazonaws.com... 72.21.214.39
Connecting to elasticmapreduce.s3.amazonaws.com|72.21.214.39|:80... connected.
HTTP request sent, awaiting response... 
  HTTP/1.1 200 OK
  x-amz-id-2: NezTUU9MIzPwo72lJWPYIMo2wwlbDGi1IpDbV/mO07Nca4VarSV8l7j/2ArmclCB
  x-amz-request-id: 3E71CC3323EC1189
  Date: Fri, 01 Oct 2010 17:38:39 GMT
  Last-Modified: Thu, 03 Jun 2010 01:57:13 GMT
  ETag: "47a007dae0ff192c166764259246388c"
  Content-Type: application/octet-stream
  Content-Length: 153
  Connection: keep-alive
  Server: AmazonS3
Length: 153 [application/octet-stream]
Saving to: `file.tar.gz'

     0K                                                       100% 24.3M=0s

2010-10-01 17:38:38 (24.3 MB/s) - `file.tar.gz' saved [153/153]

Controller

2010-10-01T17:38:35.141Z INFO Fetching file 's3://elasticmapreduce/bootstrap-actions/download.sh'
2010-10-01T17:38:38.411Z INFO Working dir /mnt/var/lib/bootstrap-actions/1
2010-10-01T17:38:38.411Z INFO Executing /mnt/var/lib/bootstrap-actions/1/download.sh
2010-10-01T17:38:38.936Z INFO Execution ended with ret val 0
2010-10-01T17:38:38.938Z INFO Execution succeeded

Conclusion
The posting has shown how to programmatically install packages (e.g. programming languages) to EC2 nodes running elastic mapreduce. Since elastic mapreduce in streaming mode supports any programming language this can make it easier to deploy and test mappers and reducers written in your favorite language, and even automate it. (Opens a few doors for parallelization of legacy code)

Atbrox on LinkedIn

Best regards,
Amund Tveit, co-founder of Atbrox

Digg This
Reddit This
Stumble Now!
Buzz This
Vote on DZone
Share on Facebook
Bookmark this on Delicious
Kick It on DotNetKicks.com
Shout it
Share on LinkedIn
Bookmark this on Technorati
Post on Twitter
Google Buzz (aka. Google Reader)
Tagged with:
May 24

Atbrox is startup company providing technology and services for Search and Mapreduce/Hadoop. Our background is from Google, IBM and research.

Update 2010-Nov-15: Amazon cluster compute instances enters 231th place on top 500 supercomputing list.

Update 2010-Jul-13: Can remove towards from the title of this posting today, Amazon just launched cluster compute instances with 10GB network bandwidth between nodes (and presents a run that enters top 500 list at 146th place, I estimate the run to cost ~$20k).

The Top 500 list is for supercomputers what Fortune 500 is for companies. About 80% of the list are supercomputers built by either Hewlett Packard or IBM, other major supercomputing vendors on the list include Dell, Sun (Oracle), Cray and SGI. Parallel linpack benchmark result is used as the ranking function for the list position (a derived list – green 500 – also includes power-efficiency in the ranking).

Trends towards Cloud Supercomputing
To our knowledge the entire top 500 list is currently based on physical supercomputer installations and no cloud computing configurations (i.e. virtual configurations lasting long enough to calculate the linpack benchmark), that will probably change within in a few years. There are however trends towards cloud-based supercomputing already (in particular within consumer internet services and pharmaceutical computations), here are some concrete examples:

  1. Zynga (online casual games, e.g. Farmville and Mafia Wars)
    Zynga uses 12000 Amazon EC2 nodes (ref: Manager of Cloud Operations at Zynga)

  2. Animoto (online video production service)
    Animoto scaled from 40 to 4000 EC2 nodes in 3 days (ref: CTO, Animoto)

  3. Myspace (social network)
    Myspace simulated 1 million simultaneous users using 800 large EC2 nodes (3200 cores) (ref: highscalability.com)

  4. New York Times
    New York Times used hundreds of EC2 nodes to process their archives in 36 hours (ref: The New York Times Archives + Amazon Web Services = TimesMachine)

  5. Reddit (news service)
    Reddit uses 218 EC2 nodes (ref: I run reddit’s servers)

Examples with (rough) estimates

  1. Justin.tv (video service)
    In october 2009 Justin.tv users watched 50 million hours of video, and they cost (reported earlier) was about 1 penny per user-video-hour, a very rough estimate would be monthly costs of 50M/0.01 = 500k$, i.e. 12*500k$ = 6M$ anually. Assuming that half their costs are computational, this would be about 3M$/(24*365*0.085) ~ 4029 EC2 nodes 24×7 through the year, but since they are a video site bandwidth is probably a significant fraction of the cost, so cutting the rough estimate in half to around 2000 EC2 nodes.
    (ref: Watching TV Together, Miles Apart and Justin.tv wins funding, opens platform)

  2. Newsweek
    Newsweek saves up to $500.000 per year by moving to the cloud, assuming they cut their spending in half by using the cloud that would correspond to $500.000/(24h/day*365d/y*0.085$/h) ~ 670 EC2 nodes 24×7 through the year (probably a little less due to storage and bandwidth costs)
    (ref: Newsweek.com Explores Amazon Cloud Computing)

  3. Recovery.gov
    Recory.gov saves up to $420.000 per year by moving to the cloud, assuming they cut their spending in half by using the cloud that would correspond to $420.000/(24h/day*365d/y*0.085$/h) ~ 560 EC2 nodes 24×7 through the year (probably a little less due to storage and bandwidth costs). (ref: Feds embrace cloud computing; move Recovery.gov to Amazon EC2)

Other examples of Cloud Supercomputing

  1. Pharmaceutical companies Eli Lilly, Johnson & Johnson and Genentech
    Offloading computations to the cloud (ref: Biotech HPC in the Cloud and The new computing pioneers)

  2. Pathwork Diagnostics
    Using EC2 for cancer diagnostics (ref: Of Unknown Origin: Diagnosing Cancer in the Cloud)

Atbrox on LinkedIn

Best regards,

Amund Tveit, co-founder of Atbrox

Digg This
Reddit This
Stumble Now!
Buzz This
Vote on DZone
Share on Facebook
Bookmark this on Delicious
Kick It on DotNetKicks.com
Shout it
Share on LinkedIn
Bookmark this on Technorati
Post on Twitter
Google Buzz (aka. Google Reader)
Tagged with:
Nov 14

We developed a tool for scalable language processing for our customer Lingit using Amazon’s Elastic Mapreduce.

More details: http://aws.amazon.com/solutions/case-studies/atbrox/

Contact us if you need help with Hadoop/Elastic Mapreduce.

Digg This
Reddit This
Stumble Now!
Buzz This
Vote on DZone
Share on Facebook
Bookmark this on Delicious
Kick It on DotNetKicks.com
Shout it
Share on LinkedIn
Bookmark this on Technorati
Post on Twitter
Google Buzz (aka. Google Reader)
Tagged with:
Nov 11

Elastic Mapreduce default behavior is to read from and store to S3. When you need to access other AWS services, e.g. SQS queues or database services SimpleDB and RDS (MySQL) the best approach from Python is to use Boto. To get Boto to work with Elastic Mapreduce you need to dynamically load boto on each mapper and reducer, Cloudera’s Jeff Hammerbacher outlined how to do that using Hadoop Distributed Cache and Peter Skomorroch suggested how to load Boto to access Elastic Blockstore (EBS), this posting is based on those ideas and gives a detailed description how to do it.

How to combine Elastic Mapreduce with other AWS Services

This posting shows how to load boto in an Elastic Mapreduce mapper and gives a simple example how to use simpledb from the same mapper. For accessing other AWS services, e.g. SQS from Elastic Mapreduce check out the Boto documentation (it is quite easy when the boto + emr integration is in place).

Other tools used (prerequisites):

Step 1 – getting and preparing the Boto library

wget http://boto.googlecode.com/files/boto-1.8d.tar.gz
# note: using virtualenv can be useful if you want to
# keep your local Python installation clean
tar -zxvf boto-1.8d.tar.gz ; cd boto-1.8d ; python setup.py install
cd /usr/local/lib/python2.6/dist-packages/boto-1.8d-py2.6.egg
zip -r boto.mod boto

Step 2 – mapper that loads boto.mod and uses it to access SimpleDB

# this was tested by adding code underneath to the mapper
# s3://elasticmapreduce/samples/wordcount/wordSplitter.py

# get boto library
sys.path.append(".")
import zipimport
importer = zipimport.zipimporter('boto.mod')
boto = importer.load_module('boto')

# access simpledb
sdb = boto.connect_sdb("YourAWSKey", "YourSecretAWSKey")
sdb_domain = boto.create_domain("mymapreducedomain") # or get_domain()
# ..
# write words to simpledb
  for word in pattern.findall(line):
      item = sdb_domain.create_item(word)
      item["reversedword"] = word[::-1]
      item.save()
      # ...

Step 3 – json config file – bototest.json – for Elastic Mapreduce Ruby Client

[	
  { 
	"Name": "Step 1: testing boto with elastic mapreduce", 
        "ActionOnFailure": "<action_on_failure>", 
        "HadoopJarStep": { 
		"Jar": "/home/hadoop/contrib/streaming/hadoop-0.18-streaming.jar", 
          	"Args": [ 
            	"-input", "s3n://elasticmapreduce/samples/wordcount/input", 
            	"-output", "s3n://yours3bucket/result",
            	"-mapper", "s3://yours3bucket/botoWordSplitter.py",
            	"-cacheFile", "s3n://yours3bucket/boto.mod#boto.mod",
          	] 
        } 
  }
]

Step 4 – Copy necessary files to s3

s3cmd put boto.mod s3://yours3bucket
s3cmd put botoWordSplitter.py s3://yours3bucket

Step 5 – And run your Elastic Mapreduce job

 elastic-mapreduce --create \
                   --stream \
                   --json bototest.json \
                   --param "<action_on_failure>=TERMINATE_JOB_FLOW"

Conclusion
This showed how to dynamically load boto and use it to access one other AWS service – SimpleDB – from Elastic Mapreduce. Boto supports most AWS services, so the same integration approach should work also for other AWS services, e.g. SQS (Queuing Service), RDS (MySQL Service) and EC2, check out the Boto API documentation or Controlling the Cloud with Python for details.

Note: a very similar integration approach should work for most Python libraries, also those that use/wrap C/C++ code (e.g. machine learning libraries such as PyML and others), but then it might be needed to do step 1 on Debian AMIs similar to what Elastic Mapreduce is using, check out a previous posting for more info about such AMIs.


Do you need help with Hadoop/Mapreduce?
A good start could be to read this book, or contact Atbrox if you need help with development or parallelization of algorithms for Hadoop/Mapreduce – info@atbrox.com. See our posting for an example parallelizing and implementing a machine learning algorithm for Hadoop/Mapreduce

Digg This
Reddit This
Stumble Now!
Buzz This
Vote on DZone
Share on Facebook
Bookmark this on Delicious
Kick It on DotNetKicks.com
Shout it
Share on LinkedIn
Bookmark this on Technorati
Post on Twitter
Google Buzz (aka. Google Reader)
Tagged with:
preload preload preload