This posting gives an an example how F# and C# can scale potentially to up to thousands of machines with Mapreduce in order to efficiently process TeraByte (TB) and PetaByte (PB) data amounts. It shows a C# (c sharp) mapper function and a F# (f sharp) reducer function with a description on how to deploy the job on Amazon’s Elastic Mapreduce using bootstrap action (it was tested with an elastic mapreduce cluster of 10 machines).
The .net environment used is mono 2.8 and FSharp 2.0. The code described in this posting can be found on https://github.com/atbrox/atbroxexamples/
Mapreduce Code
C# mapper
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text.RegularExpressions;
public class WordCountMapper {
static public void Main () {
String line;
while( (line = Console.ReadLine()) != null) {
line.Trim().Split(' ')
.Where(s => !String.IsNullOrEmpty(s.Trim()))
.ToList()
.ForEach(term => Console.Write(term.ToLower() + "\t1\n"));
}
}
}
Compiling c# code
mcs wordcountmapper.cs mkbundle -o wordcountmapper wordcountmapper.exe --deps --static
F# reducer
open System.Net
open System
open System.IO
// note: F# has default immutable types (~ Erlang),
// use explicit mutable keyword
let sumReducer (key:string, values:seq<string[]>) : int =
let mutable count = 0
for value in values do
count <- count + Convert.ToInt32(value.[1])
mysum
// note: |> below is similar to unix pipes
let reduceInput (tr:TextReader) =
Seq.initInfinite (fun _ -> tr.ReadLine())
|> Seq.takeWhile (fun line -> line <> null)
|> Seq.map (fun line -> line.Split('\t','\n',' '))
|> Seq.groupBy (fun line -> line.[0].Trim())
let reduceRunner (tr:TextReader) (reducef:string*seq<string[]>->int) =
for line in reduceInput tr do
let key,values = line
let wc = reducef(key,values)
System.Console.WriteLine(key+"\t"+Convert.ToString(wc))
// run the sumReducer method on input data from stdin
reduceRunner Console.In sumReducer
Compiling fsharp code
cd FSharp-2.0.0.0 mono bin/fsc.exe wordcountreducer.fsx --standalone mkbundle -o wordcountreducer wordcountreducer.exe --deps --static
Deployment on Amazon’s Elastic Mapreduce
In order to run mono/.net code on Elastic Mapreduce (Debian) Linux nodes you need to install mono on each node, this can be done with a bootstrap action shell script. In order to deploy the mapreduce job itself there are several options, the one shown in this posting is using the Boto API for python. Note: this requires that you fetch upload the mono-2.8.2-parallel-environment-amd64.deb to your s3 BUCKET in advance.
Bootstrap action shell script for installing mono
#!/bin/bash hadoop fs -copyToLocal s3://BUCKET/mono-2.8.2-parallel-environment-amd64.deb . sudo dpkg -i mono-2.8.2-parallel-environment-amd64.deb
Python script to deploy mapreduce and check status until it is done
import boto
import boto.emr
from boto.emr.step import StreamingStep
from boto.emr.bootstrap_action import BootstrapAction
import time
# set your aws keys and S3 bucket, e.g. from environment or .boto
AWSKEY=
SECRETKEY=
S3_BUCKET=
NUM_INSTANCES = 1
SLAVE_INSTANCE_TYPE = 'm1.small'
MASTER_INSTANCE_TYPE = 'm1.small'
conn = boto.connect_emr(AWSKEY,SECRETKEY)
bootstrap_step = BootstrapAction("installmono", "s3://" + S3_BUCKET + "/monobootstrap.sh",None)
step = StreamingStep(name='Wordcount',
mapper='s3n://' + S3_BUCKET + '/csharpmapper',
reducer='s3n://' + S3_BUCKET + '/fsharpreducer',
input='s3n://elasticmapreduce/samples/wordcount/input',
output='s3n://' + S3_BUCKET + '/output')
jobid = conn.run_jobflow(
name="emr with fsharp and csharp",
log_uri="s3://" + S3_BUCKET + "/logs",
steps = [step],
bootstrap_actions=[bootstrap_step],
num_instances=NUM_INSTANCES,
slave_instance_type=SLAVE_INSTANCE_TYPE,
master_instance_type=MASTER_INSTANCE_TYPE,
enable_debugging=True
)
print "finished spawning job (note: starting still takes time)"
state = conn.describe_jobflow(jobid).state
print "job state = ", state
print "job id = ", jobid
while state != u'COMPLETED':
print time.localtime()
time.sleep(30)
state = conn.describe_jobflow(jobid).state
print "job state = ", state
print "job id = ", jobid
print "final output can be found in s3://" + S3_BUCKET + "/output"
print "try: $ s3cmd sync s3://" + S3_BUCKET + "/output" + TIMESTAMP + " ."
In order to run the python deploy job you need to install boto, update constants in top of file for AWS keys and S3 bucket and finally start it with python deploy.py
Conclusion
This posting has given an example of a simple – wordcount in C# and F# with how to deploy the job on Amazon’s Elastic Mapreduce. It has been tested on up 10 m1.large ec2 nodes. Code can checked out with git clone git@github.com:atbrox/atbroxexamples.git.
Best regards,
Amund Tveit (amund
Atbrox
















Facebook
LinkedIn
Twitter
February 7th, 2011 at 00:32
An example of using F# and C# (.net/mono) with Amazon’s Elastic Mapreduce…
I’ve written an example with a C# (c sharp) mapper function and a F# (f sharp) reducer function with description on how to deploy the job on Amazon’s Elastic Mapreduce using bootstrap action. The .net environment used is mono 2.8 and FSharp 2.0. The c…
February 7th, 2011 at 00:55
[...] This post was mentioned on Twitter by Amund Tveit, bundleofstartup, atbrox, Hacker News, m.y.ikegami_bot and others. m.y.ikegami_bot said: An example of using F# and C# with Amazon’s Elastic Mapreduce http://goo.gl/fb/aqPl0 [...]
February 9th, 2011 at 14:01
Your “sumReducer” can be rewritten as: “Seq.fold (+) 0 values” (that way you don’t need mutable variables). This also means you could inline it directly in your “reduceRunner” function.
In addition, you should use a Seq.iter instead of the “for” construct: “reduceInput tr |> Seq.iter (fun (key,values) -> … )”.
Finally, your write line can be: ” printf “%s\t%s” key (string wc) “.
February 16th, 2011 at 21:23
[...] Amund Tveit’s An example of using F# and C# (.net/mono) with Amazon’s Elastic Mapreduce (Had… “This posting gives an example with a C# (c sharp) mapper function and a F# (f sharp) reducer function with description on how to deploy the job on Amazon’s Elastic Mapreduce using bootstrap action.” [...]
June 29th, 2011 at 01:47
Got error from Mono when run this command line over your sample code:
mkbundle -o wordcountmapper wordcountmapper.exe –deps –static
The option `–static’ is not supported on this platform.
OS: Windows Server 2008 32bit
Could you please specify your environment?
Thank you in advance.
August 3rd, 2011 at 23:14
You create two executables:
wordcountmapper and wordcountreducer
Your python script references this for the mapper and the reducer:
mapper=’s3n://’ + S3_BUCKET + ‘/csharpmapper’,
reducer=’s3n://’ + S3_BUCKET + ‘/fsharpreducer’
What is csharpmapper and fsharpreducer and how does it relate to wordcountmapper and wordcountreducer?
Thanks
July 3rd, 2012 at 19:56
Can you use AOT compiling to remove the requirement for installing Mono on the local machine?