May 02

atbr

atbr (large-scale and low-latency in-memory key-value pair store) now supports Apache Thrift for easier integration with other Hadoop services.

Thrift Example

Checkout and install atbr

$ git clone git@github.com:atbrox/atbr.git
$ cd atbr
$ sudo ./INSTALL.sh

Prerequisite Install/compile Apache Thrift – http://thrift.apache.org/

Compile a atbr thrift server and connect using python client

$ cd atbrthrift
$ make
$ ./atbr_thrift_server # c++ server
$ python test_atbr_thrift_client.py 

Python thrift api example

from atbr_thrift_client import connect_to_atbr_thrift_service
service = connect_to_atbr_thrift_service("localhost", "9090")
service.load("keyvaluedata.tsv")
value = service.get("key1")

Stay tuned for other updates on atbr.

Rough roadmap

  • Increased concurrency and threadsafety support
  • Increased reliability in sharded deployments (with Apache Zookeeper)
  • Simplified and automated sharded deployment on AWS and clusters
  • Benchmarks
  • Comparison with other storage alternative (e.g. HBase, Redis, MongoDB, CouchDB and Cassandra)
  • End-to-end examples (from hadoop/mapreduce jobs to serving)
  • (in-memory) map(reduce) support with Lua or C++
  • Avro support
  • large-scale graph processing example (ref: NetworkX)
  • Case studies
  • Add support for Judy Datastructure
  • Thrift-support (done)
  • Sharded websocket support (done) [blog post]
  • Memory-efficient key-value store (done) [blog post]

Documentation
atbr.atbrox.com

Best regards,

Amund Tveit (@atveit)
Atbrox

Digg This
Reddit This
Stumble Now!
Buzz This
Vote on DZone
Share on Facebook
Bookmark this on Delicious
Kick It on DotNetKicks.com
Shout it
Share on LinkedIn
Bookmark this on Technorati
Post on Twitter
Google Buzz (aka. Google Reader)
Tagged with:
Feb 07

This posting gives an an example how F# and C# can scale potentially to up to thousands of machines with Mapreduce in order to efficiently process TeraByte (TB) and PetaByte (PB) data amounts. It shows a C# (c sharp) mapper function and a F# (f sharp) reducer function with a description on how to deploy the job on Amazon’s Elastic Mapreduce using bootstrap action (it was tested with an elastic mapreduce cluster of 10 machines).

The .net environment used is mono 2.8 and FSharp 2.0. The code described in this posting can be found on https://github.com/atbrox/atbroxexamples/

Mapreduce Code

C# mapper

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text.RegularExpressions;
 
public class WordCountMapper {
  static public void Main () {
    String line;
    while( (line = Console.ReadLine()) != null) {
      line.Trim().Split(' ')
       .Where(s => !String.IsNullOrEmpty(s.Trim()))
       .ToList()
       .ForEach(term => Console.Write(term.ToLower() + "\t1\n"));
    }
  }
}

Compiling c# code

mcs wordcountmapper.cs
mkbundle -o wordcountmapper wordcountmapper.exe --deps --static

F# reducer

open System.Net
open System
open System.IO

// note: F# has default immutable types (~ Erlang), 
// use explicit mutable keyword
let sumReducer (key:string, values:seq<string[]>) : int =
    let mutable count = 0 
    for value in values do
        count <- count  + Convert.ToInt32(value.[1])
    mysum

// note: |> below is similar to unix pipes 
let reduceInput (tr:TextReader) =
    Seq.initInfinite (fun _ -> tr.ReadLine())
    |> Seq.takeWhile (fun line -> line <> null) 
    |> Seq.map (fun line -> line.Split('\t','\n',' '))
    |> Seq.groupBy (fun line -> line.[0].Trim())

let reduceRunner (tr:TextReader) (reducef:string*seq<string[]>->int) = 
    for line in reduceInput tr do
        let key,values = line
        let wc = reducef(key,values)
        System.Console.WriteLine(key+"\t"+Convert.ToString(wc))

// run the sumReducer method on input data from stdin
reduceRunner Console.In sumReducer

Compiling fsharp code

cd FSharp-2.0.0.0
mono bin/fsc.exe wordcountreducer.fsx --standalone 
mkbundle -o wordcountreducer wordcountreducer.exe --deps --static

Deployment on Amazon’s Elastic Mapreduce

In order to run mono/.net code on Elastic Mapreduce (Debian) Linux nodes you need to install mono on each node, this can be done with a bootstrap action shell script. In order to deploy the mapreduce job itself there are several options, the one shown in this posting is using the Boto API for python. Note: this requires that you fetch upload the mono-2.8.2-parallel-environment-amd64.deb to your s3 BUCKET in advance.

Bootstrap action shell script for installing mono

#!/bin/bash
hadoop fs -copyToLocal s3://BUCKET/mono-2.8.2-parallel-environment-amd64.deb .
sudo dpkg -i mono-2.8.2-parallel-environment-amd64.deb 

Python script to deploy mapreduce and check status until it is done

import boto
import boto.emr
from boto.emr.step import StreamingStep
from boto.emr.bootstrap_action import BootstrapAction
import time

# set your aws keys and S3 bucket, e.g. from environment or .boto
AWSKEY=
SECRETKEY=
S3_BUCKET=
NUM_INSTANCES = 1
SLAVE_INSTANCE_TYPE = 'm1.small'
MASTER_INSTANCE_TYPE = 'm1.small'

conn = boto.connect_emr(AWSKEY,SECRETKEY)

bootstrap_step = BootstrapAction("installmono", "s3://" + S3_BUCKET + "/monobootstrap.sh",None)

step = StreamingStep(name='Wordcount',
                     mapper='s3n://' + S3_BUCKET + '/csharpmapper',
                     reducer='s3n://' + S3_BUCKET + '/fsharpreducer',
                     input='s3n://elasticmapreduce/samples/wordcount/input',
                     output='s3n://' + S3_BUCKET + '/output')

jobid = conn.run_jobflow(
    name="emr with fsharp and csharp", 
    log_uri="s3://" + S3_BUCKET + "/logs", 
    steps = [step],
    bootstrap_actions=[bootstrap_step],
    num_instances=NUM_INSTANCES,
    slave_instance_type=SLAVE_INSTANCE_TYPE,
    master_instance_type=MASTER_INSTANCE_TYPE,
    enable_debugging=True
)

print "finished spawning job (note: starting still takes time)"

state = conn.describe_jobflow(jobid).state
print "job state = ", state
print "job id = ", jobid
while state != u'COMPLETED':
    print time.localtime()
    time.sleep(30)
    state = conn.describe_jobflow(jobid).state
    print "job state = ", state
    print "job id = ", jobid

print "final output can be found in s3://" + S3_BUCKET + "/output" 
print "try: $ s3cmd sync s3://" + S3_BUCKET + "/output" + TIMESTAMP + " ."

In order to run the python deploy job you need to install boto, update constants in top of file for AWS keys and S3 bucket and finally start it with python deploy.py

Conclusion
This posting has given an example of a simple – wordcount in C# and F# with how to deploy the job on Amazon’s Elastic Mapreduce. It has been tested on up 10 m1.large ec2 nodes. Code can checked out with git clone git@github.com:atbrox/atbroxexamples.git.

Atbrox on LinkedIn

Best regards,
Amund Tveit (amund atbrox.com)
Atbrox

Digg This
Reddit This
Stumble Now!
Buzz This
Vote on DZone
Share on Facebook
Bookmark this on Delicious
Kick It on DotNetKicks.com
Shout it
Share on LinkedIn
Bookmark this on Technorati
Post on Twitter
Google Buzz (aka. Google Reader)
Tagged with:
Oct 07

Sometimes it can be useful to compile Python code for Amazon’s Elastic Mapreduce into C++ and then into a binary. The motivation for that could be to¬†integrate with (existing) C or C++ code, or increase performance for CPU-intensive mapper or reducer methods. Here follows a description how to do that:

  1. Start a small EC2 node with AMI similar to the one Elastic Mapreduce is using (Debian Lenny Linux)
  2. Skim quickly through the Shedskin tutorial
  3. Log into the EC2 node and install the Shedskin Python compiler
  4. Write your Python mapper or reducer program and compile it into C++ with Shedskin
    • E.g. the commandpython ss.py mapper.py – would generate C++ files mapper.hpp and mapper.cpp, a Makefile and an annotated Python file mapper.ss.py.
  5. Optionally update the C++ code generated by Shedskin to use other C or C++ libraries
    • note: with Fortran-to-C you can probably integrate your Python code with existing Fortran code (e.g. numerical/high performance computing libraries). Similar for Cobol (e.g. in financial industry) with OpenCobol (compiling Cobol into C). Please let us know if you try or need help with help that.
  6. Add -static as the first CCFLAGS parameter in the generated Makefile to make it a static executable
  7. Compile the C++ code into a binary with make and check that you don’t get a dynamic executable with ldd (you want a static executable)
  8. Run strip on the binary to make it smaller
  9. Upload your (ready) binary to a chosen location in Amazon S3
  10. Read Elastic Mapreduce Documentation on how to use the binary to run Elastic Mapreduce jobs.

Note: if you skip the shedskin-related steps this approach would also work if you are looking for how to use C or C++ mappers or reducers with Elastic Mapreduce.

Note: this approach should probably work also with Cloudera’s distribution for Hadoop.


Do you need help with Hadoop/Mapreduce?
A good start could be to read this book, or

contact Atbrox if you need help with development or parallelization of algorithms for Hadoop/Mapreduce – info@atbrox.com. See our posting for an example parallelizing and implementing a machine learning algorithm for Hadoop/Mapreduce
Digg This
Reddit This
Stumble Now!
Buzz This
Vote on DZone
Share on Facebook
Bookmark this on Delicious
Kick It on DotNetKicks.com
Shout it
Share on LinkedIn
Bookmark this on Technorati
Post on Twitter
Google Buzz (aka. Google Reader)
Tagged with:
preload preload preload