Feb 25

There are currently interesting developments of scalable (up to Petabytes), low-latency and affordable datawarehouse related solutions, e.g.

  1. AWS Redshift (cloud-based) [1]
  2. Cloudera’s Impala (open source) [2,3]
  3. Apache Thrill (open source) [4]

This posting shows how one of them – AWS Redshift – can be combined with Hadoop/Elastic mapreduce for processing of semi/unstructured data.

1. Processing of structured vs unstructured/semistructured data



A good gold mine has 8-10 grams of gold per ton of gold ore (i.e. 0.008-0.01%), the amount of structured (“gold”) vs unstructured data (“gold ore”) is not that dissimilar (probably between 0.01-10% in many cases)

What is common for the solutions above is that they are primarily targeted towards efficient processing of structured data – as opposed to un/semi-structured data. This posting gives a simple integration example of how Elastic Mapreduce/Hadoop can be used to preprocess data into structured data that can be easily imported into and analyzed with AWS Redshift.

In the general case – and not the simplistic json data used in this example – Mapreduce algorithms could be used to process any type of input un/semi-structured data (e.g. video, audio, images and text) and where fit produce structured data that can be imported into Redshift. See my O’Reilly Strata Presentation – Mapreduce Algorithms – for more examples/pointers about capabilities of Mapreduce [5].

2. Processing input data with Elastic Mapreduce/Hadoop and import results to Redshift

The input data used in this example is parts of the the del.icio.us bookmarking data set collected (crawled) by Arvind Naraynanan (CS Professor at University of Princeton) [6,7]. Since the main purpose of this is to show integration between Mapreduce and Redshift the example is rather simple:

  1. the mapper function processes individual json del.icio.us records and produces records that contains some basic stats about tag lengths used in bookmarks,
  2. the reducer just writes out the results as tab-separated files on AWS S3.
  3. Finally the Mapreduce output is imported into AWS Redshift where further query-based analytics can begin.

3. Example input JSON record

{
    "author": "linooliveira",
    "comments": "http://delicious.com/url/0001c173b0f84ea81d188336223f9d7d",
    "guidislink": false,
    "id": "http://delicious.com/url/0001c173b0f84ea81d188336223f9d7d#linooliveira",
    "link": "http://www.amadeus.net/plnext/meb/HomePageDispatcher.action?SITE=BCEUBCEU&LANGUAGE=GB",
    "links": [
        {
            "href": "http://www.amadeus.net/plnext/meb/HomePageDispatcher.action?SITE=BCEUBCEU&LANGUAGE=GB",
            "rel": "alternate",
            "type": "text/html"
        }
    ],
    "source": {},
    "tags": [
        {
            "label": null,
            "scheme": "http://delicious.com/linooliveira/",
            "term": "trips"
        },
        {
            "label": null,
            "scheme": "http://delicious.com/linooliveira/",
            "term": "howto"
        },
        {
            "label": null,
            "scheme": "http://delicious.com/linooliveira/",
            "term": "tips"
        },
        {
            "label": null,
            "scheme": "http://delicious.com/linooliveira/",
            "term": "viagens"
        }
    ],
    "title": "Flight Times, Flight Schedules, Best fares, Best rates, Hotel Rooms, Car Rental, Travel Guides, Trip Planning - Amadeus.net",
    "title_detail": {
        "base": "http://feeds.delicious.com/v2/rss/recent?min=1&count=100",
        "language": null,
        "type": "text/plain",
        "value": "Flight Times, Flight Schedules, Best fares, Best rates, Hotel Rooms, Car Rental, Travel Guides, Trip Planning - Amadeus.net"
    },
    "updated": "Sun, 06 Sep 2009 11:36:20 +0000",
    "wfw_commentrss": "http://feeds.delicious.com/v2/rss/url/0001c173b0f84ea81d188336223f9d7d"
}

4. Example of output TSV record produced by Mapreduce

# fields: id, weekday, month, year, hour, minute, second, num_tags, sum_tag_len, avg_tag_len, num_tags_with_len0,num_tags_with_len1,.., num_tags_with_len9


http://delicious.com/url/0001c173b0f84ea81d188336223f9d7d#linooliveira Sun Sep 2009 11 36 20 4 21.0 5.25 0 0 0 0 1 2 0 1 0 0

5. Elastic Mapreduce/Hadoop code in Python

Probably one of the easiest ways to use Elastic Mapreduce is to write the mapreduce code in Python using Yelp’s (excellent) mrjob [8]. And there are of course plenty of reasons to choose Python as the programming language, see [9-14].

from mrjob.job import MRJob
from mrjob.protocol import RawProtocol
import json
import sys
import logging

class PreprocessDeliciousJsonMapreduce(MRJob):
    INPUT_PROTOCOL = RawProtocol # mrjob trick 1
    OUTPUT_PROTOCOL = RawProtocol # mrjob trick 2

    def calc_tag_stats(self, jvalue):
        tag_len_freqs = {}
        num_tags = len(jvalue["tags"])
        sum_tag_len = 0.0
        for taginfo in jvalue["tags"]:
            tag_len = len(taginfo["term"])
            if tag_len < 10: # only keep short tags
                sum_tag_len += tag_len
                tag_len_freqs[tag_len] = tag_len_freqs.get(tag_len, 0) + 1
        for j in range(10):
            if not tag_len_freqs.has_key(j):
                tag_len_freqs[j] = 0 # fill in the blanks
        avg_tag_len = sum_tag_len / num_tags
        return avg_tag_len, num_tags, sum_tag_len, tag_len_freqs

    def get_date_parts(self, jvalue):
        (weekday, day, month, year, timestamp) = jvalue["updated"].replace(",", "").split(" ")[:5]
        (hour, minute, second) = timestamp.split(':')[:3]
        return hour, minute, month, second, weekday, year

    def mapper(self, key, value):
        try:
            jvalue = json.loads(key)
            if jvalue.has_key("tags"):
                avg_tag_len, num_tags, sum_tag_len, tag_len_freqs = self.calc_tag_stats(jvalue)
                hour, minute, month, second, weekday, year = self.get_date_parts(jvalue)

                out_data = [weekday, month, year, hour,minute,second, num_tags, sum_tag_len, avg_tag_len]

                for tag_len in sorted(tag_len_freqs.keys()):
                    out_data.append(tag_len_freqs[tag_len])

                str_out_data = [str(v) for v in out_data]

                self.increment_counter("mapper", "kept_entries", 1)

                yield jvalue["id"], "\t".join(str_out_data)
        except Exception, e:
            self.increment_counter("mapper", "skipped_entries", 1)
            logging.error(e)

    def reducer(self, key, values):
        for value in values:
            yield key, value

    def steps(self):
        return [self.mr(mapper=self.mapper,
                        reducer=self.reducer),]

if __name__ == '__main__':
    PreprocessDeliciousJsonMapreduce.run()

6. Running the Elastic Mapreduce job

Assuming you’ve uploaded the del.icio.us (or other) data set to s3, you can start the job like this (implicitly using mrjob)


#!/bin/bash

# TODO(READER): set these variables first
export AWS_ACCESS_KEY_ID=
export AWS_SECRET_ACCESS_KEY=
export INPUT_S3=”s3://somes3pathhere”
export LOG_S3=”s3://another3pathhere”
export OUTPUT_S3=”s3://someothers3pathhere”

nohup python mapreduce_delicious.py --ssh-tunnel-to-job-tracker --jobconf mapreduce.output.compress=true --ssh-tunnel-is-closed --ec2-instance-type=m1.small --no-output --enable-emr-debugging --ami-version=latest --s3-log-uri=${LOGS_S3} -o ${OUTPUT_S3} -r emr ${INPUT_S3} --num-ec2-instances=1 &

note: for larger data sets you probably want to use other instance types (e.g. c1.xlarge) and a higher number of instances.

7. Connecting, Creating Tables and Importing Mapreduce Data with AWS Redshift

There are several ways of creating and using a Redshift cluster, for this example I used the AWS Console [15], but for an automated approach using the Redshift API would be more approriate (e.g. with boto [16,17])



AWS Redshift Web Console

When you have created the cluster (and given access permissions to the machine you are accessing the Redshift cluster the from), you can access the Redshift cluster e.g. using a Postgresql Client – as below:

psql -d "[your-db-name]" -h "[your-redshift-cluster-host]" -p "[port-number]" -U "[user-name]"

and login with password and then you should be connected.

Creating table can e.g. be done with

CREATE TABLE deliciousdata (
       id varchar(255) not null distkey,
       weekday varchar(255),
       month varchar(255),
       year varchar(255),
       hour varchar(255),
       minute varchar(255),
       second varchar(255),
       num_tags varchar(255),
       sum_tag_len varchar(255),
       avg_tag_len varchar(255),
       num_tags_with_len0 varchar(255),
       num_tags_with_len1 varchar(255),
       num_tags_with_len2 varchar(255),
       num_tags_with_len3 varchar(255),
       num_tags_with_len4 varchar(255),
       num_tags_with_len5 varchar(255),
       num_tags_with_len6 varchar(255),
       num_tags_with_len7 varchar(255),
       num_tags_with_len8 varchar(255),
       num_tags_with_len9 varchar(255)
);

And data can be imported by substituting the values used the export statements earlier in the blog post (i.e. OUTPUT_S3, AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY) in the copy-command below.


copy deliciousdata from 'OUTPUT_S3/part-00000' CREDENTIALS 'aws_access_key_id=AWS_ACCESS_KEY_ID;aws_secret_access_key=AWS_SECRET_ACCESS_KEY' delimiter '\t';

8. Analytics with AWS Redshift

If everything went well, you should now be able to do SQL-queries on the data you produced with mapreduce now stored in Redshift, e.g.

select count(*) from deliciousdata;

Since this posting is about integration I leave this part as an exercise to the reader..

9. Conclusion

This posting has given an example how Elastic Mapreduce/Hadoop can produce structured data that can be imported into AWS Redshift datawarehouse.

Redshift Pricing Example
But since Redshift is a cloud-based solution (i.e. with more transparent pricing than one typically find in enterprise software) you probably wonder what it costs? If you sign up for a 3 year reserved plan with 16TB of storage (hs1.8xlarge), the efficient annual price per Terabyte is $999[1], but what does this mean? Back in 2009 Joe Cunningham from VISA disclosed[18] that they had 42 Terabytes that covered 2 years of raw transaction logs. if one assumes that they would run this on Redshift on 3 hs1.8xlarge instances on a 3 year reserved plan (with 3*16 = 48 TB available storage), the efficient price would be 48*999 = 47.9K$ per year. Since most companies probably have less amounts of structured data than VISA this amount is perhaps an upper bound for most companies?

For examples other Data Warehouse prices check out this blog post (covers HANA, Exadata, Teradata and Greenplum)[19]

Best regards,
Amund Tveit
Atbrox


A. References

[1] http://aws.typepad.com/aws/2012/11/amazon-redshift-the-new-aws-data-warehouse.html
[2] http://blog.cloudera.com/blog/2012/10/cloudera-impala-real-time-queries-in-apache-hadoop-for-real/
[3] https://github.com/cloudera/impala
[4] http://incubator.apache.org/drill/
[5] http://www.slideshare.net/amundtveit/mapreduce-algorithms
[6] http://randomwalker.info/
[7] http://arvindn.livejournal.com/116137.html
[8] https://github.com/Yelp/mrjob
[9] http://instagram-engineering.tumblr.com/post/13649370142/what-powers-instagram-hundreds-of-instances-dozens-of
[10] http://ontwik.com/python/disqus-scaling-the-world%E2%80%99s-largest-django-application/
[11] https://blog.brainsik.net/2009/why-reddit-uses-python
[12] http://www.quora.com/Why-did-Pinterest-founders-use-Python
[13] http://www.quora.com/Quora-Infrastructure/Why-did-Quora-choose-Python-for-its-development
[14] http://www.python.org/about/quotes/
[15] http://docs.aws.amazon.com/redshift/latest/gsg/redshift-gsg.pdf
[16] http://redshiftuser.wordpress.com/2013/01/07/using-boto-to-load-data-into-aws-redshift/
[17] http://docs.pythonboto.org/en/latest/
[18] http://atbrox.com/2009/10/03/hadoop-world-2009-notes-from-application-session/
[19] http://robklopp.wordpress.com/2012/11/15/priceperformance-of-hana-exadata-teradata-and-greenplum/
Digg This
Reddit This
Stumble Now!
Buzz This
Vote on DZone
Share on Facebook
Bookmark this on Delicious
Kick It on DotNetKicks.com
Shout it
Share on LinkedIn
Bookmark this on Technorati
Post on Twitter
Google Buzz (aka. Google Reader)
Tagged with:
May 01

atbr (large-scale and low-latency in-memory key-value pair store) now supports websocket-based sharding for parallel deployments.

Websocket Sharding Example

Checkout and install atbr

$ git clone git@github.com:atbrox/atbr.git
$ cd atbr
$ sudo ./INSTALL.sh

Start 3 servers loaded with data

$ cd atbrserver
$ python atbr_server.py 8585 shard_data_1.tsv
$ python atbr_server.py 8686 shard_data_2.tsv
$ python atbr_server.py 8787 shard_data_3.tsv

Start shard server talking to shards

  
$ python atbr_shard_server.py localhost:8585 \
          localhost:8686 localhost:8787

Connect to shard server and lookup key=key1

$ python atbr_websocket_cmdline_client.py key1

Stay tuned for other updates on atbr, here is a rough roadmap.

  • Increased concurrency and threadsafety support
  • Increased reliability in sharded deployments (with Apache Zookeeper)
  • Simplified and automated sharded deployment on AWS and clusters
  • Benchmarks
  • Comparison with other storage alternative (e.g. HBase, Redis, MongoDB, CouchDB and Cassandra)
  • End-to-end examples (from hadoop/mapreduce jobs to serving)
  • (in-memory) map(reduce) support with Lua or C++
  • Thrift support
  • Avro support
  • large-scale graph processing example (ref: NetworkX)
  • Case studies

Documentation
atbr.atbrox.com

Best regards,

Amund Tveit (@atveit)
Atbrox

Digg This
Reddit This
Stumble Now!
Buzz This
Vote on DZone
Share on Facebook
Bookmark this on Delicious
Kick It on DotNetKicks.com
Shout it
Share on LinkedIn
Bookmark this on Technorati
Post on Twitter
Google Buzz (aka. Google Reader)
Tagged with:
Nov 30

If you are interested in Mapreduce or Hadoop I recommend submitting to or attending the following workshop.

The Third International Workshop on MapReduce and its Applications (MAPREDUCE’12)
June 18-19, 2012 HPDC’2012, Delft, the Netherlands.
http://graal.ens-lyon.fr/mapreduce/

SCOPE
=====

Since its introduction in 2004 by Google, MapReduce has become the programming
model of choice for processing large data sets. MapReduce borrows from functional
programming, where a programmer can define both a Map task that maps a data set
into another data set, and a Reduce task that combines intermediate outputs into
a final result. Although MapReduce was originally developed for use by web
enterprises in large data-centers, this technique has gained a lot of attention
from the scientific community for its applicability in large parallel data
analysis (including geographic, high energy physics, genomics, etc..).

The purpose of the workshop is to provide a forum for discussing recent advances,
identifying open issues, introducing developments and tools, and presenting
applications and enhancements for MapReduce (or very similar) systems. We
therefore cordially invite contributions that investigate these issues, introduce
new execution environments, apply performance evaluations and show the
applicability to science and enterprise applications.

TOPICS OF INTEREST
==================

– MapReduce implementation issues and improvements
– Implementation optimization for GPU and multi-core systems
– Extensions to the programing model
– Large-scale MapReduce (Grid and Desktop Grid)
– Use of CDN and P2P techniques
– Heterogeneity and fault-tolerance
– Scientific data-sets analysis
– Data and compute-intensive applications
– Tools and environments for MapReduce
– Algorithms using the MapReduce paradigm

PAPER SUBMISSIONS
=================

Authors are invited to submit full papers of at most 8 pages, including all
figures and references. Papers should be formatted in the ACM proceedings style
(e.g., http://www.acm.org/sigs/publications/proceedings-templates). Submitted
papers must be original work that has not appeared in and is not under
consideration for another conference or a journal. Accepted papers will be
published by ACM in the conference workshops proceedings.

Papers should be submitted here: TO BE ANNOUNCED ON WEBSITE

IMPORTANT DATES
===============

– Manuscript submission deadline : February 25, 2012
– Acceptance notification : March 26, 2012
– Camera-ready paper deadline : April 16, 2012
– Workshop dates : June 18-19, 2012

ORGANIZATION COMMITTEE
======================

General Chairs
==============

– Gilles Fedak, INRIA/LIP (contact: gilles.fedak@inria.fr)
– Geoffrey Fox, Indiana University (contact: gcf@cs.indiana.edu)

Program Chair
=============

Simon Delamare, INRIA/LIP (contact: simon.delamare@inria.fr)

Publicity chair
===============

Haiwu He, INRIA/LIP (contact: haiwu.he@inria.fr)

PROGRAM COMMITTEE
=================

– Alexandre de Assis Bento Lima, Federal University of Rio de Janeiro
– Amund Tveit, Atbrox
– Carlo Mastroianni, ICAR-CNR
– Christian Engelmann, Oak Ridge National Laboratory
– Francisco V. Brasileiro, Federal University of Campina Grande
– Frédéric Suter, IN2P3/CNRS
– Gabriel Antoniu, INRIA
– Heithem Abbes, Faculty of Sciences of Tunis
– Heshan Lin, Virginia Polytechnic Institute and State University
– Hidemoto Nakada, AIST
– Jerry Zhao, Google (tech.lead for Google’s Mapreduce team, sorting PetaBytes)
– José A.B. Fortes, University of Florida
– Judy Qiu, Indiana University
– Michael C. Schatz, Cold Spring Harbor Laboratory
– Oleg Lodygensky, CNRS
– Shantenu Jha, Louisiana State University
– Xuanhuan Shi, Huazhong University of Science and Technology
– Yang Yang, Netflix

Best regards,
Amund Tveit, Atbrox

Digg This
Reddit This
Stumble Now!
Buzz This
Vote on DZone
Share on Facebook
Bookmark this on Delicious
Kick It on DotNetKicks.com
Shout it
Share on LinkedIn
Bookmark this on Technorati
Post on Twitter
Google Buzz (aka. Google Reader)
Tagged with:
Feb 07

This posting gives an an example how F# and C# can scale potentially to up to thousands of machines with Mapreduce in order to efficiently process TeraByte (TB) and PetaByte (PB) data amounts. It shows a C# (c sharp) mapper function and a F# (f sharp) reducer function with a description on how to deploy the job on Amazon’s Elastic Mapreduce using bootstrap action (it was tested with an elastic mapreduce cluster of 10 machines).

The .net environment used is mono 2.8 and FSharp 2.0. The code described in this posting can be found on https://github.com/atbrox/atbroxexamples/

Mapreduce Code

C# mapper

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text.RegularExpressions;
 
public class WordCountMapper {
  static public void Main () {
    String line;
    while( (line = Console.ReadLine()) != null) {
      line.Trim().Split(' ')
       .Where(s => !String.IsNullOrEmpty(s.Trim()))
       .ToList()
       .ForEach(term => Console.Write(term.ToLower() + "\t1\n"));
    }
  }
}

Compiling c# code

mcs wordcountmapper.cs
mkbundle -o wordcountmapper wordcountmapper.exe --deps --static

F# reducer

open System.Net
open System
open System.IO

// note: F# has default immutable types (~ Erlang), 
// use explicit mutable keyword
let sumReducer (key:string, values:seq<string[]>) : int =
    let mutable count = 0 
    for value in values do
        count <- count  + Convert.ToInt32(value.[1])
    mysum

// note: |> below is similar to unix pipes 
let reduceInput (tr:TextReader) =
    Seq.initInfinite (fun _ -> tr.ReadLine())
    |> Seq.takeWhile (fun line -> line <> null) 
    |> Seq.map (fun line -> line.Split('\t','\n',' '))
    |> Seq.groupBy (fun line -> line.[0].Trim())

let reduceRunner (tr:TextReader) (reducef:string*seq<string[]>->int) = 
    for line in reduceInput tr do
        let key,values = line
        let wc = reducef(key,values)
        System.Console.WriteLine(key+"\t"+Convert.ToString(wc))

// run the sumReducer method on input data from stdin
reduceRunner Console.In sumReducer

Compiling fsharp code

cd FSharp-2.0.0.0
mono bin/fsc.exe wordcountreducer.fsx --standalone 
mkbundle -o wordcountreducer wordcountreducer.exe --deps --static

Deployment on Amazon’s Elastic Mapreduce

In order to run mono/.net code on Elastic Mapreduce (Debian) Linux nodes you need to install mono on each node, this can be done with a bootstrap action shell script. In order to deploy the mapreduce job itself there are several options, the one shown in this posting is using the Boto API for python. Note: this requires that you fetch upload the mono-2.8.2-parallel-environment-amd64.deb to your s3 BUCKET in advance.

Bootstrap action shell script for installing mono

#!/bin/bash
hadoop fs -copyToLocal s3://BUCKET/mono-2.8.2-parallel-environment-amd64.deb .
sudo dpkg -i mono-2.8.2-parallel-environment-amd64.deb 

Python script to deploy mapreduce and check status until it is done

import boto
import boto.emr
from boto.emr.step import StreamingStep
from boto.emr.bootstrap_action import BootstrapAction
import time

# set your aws keys and S3 bucket, e.g. from environment or .boto
AWSKEY=
SECRETKEY=
S3_BUCKET=
NUM_INSTANCES = 1
SLAVE_INSTANCE_TYPE = 'm1.small'
MASTER_INSTANCE_TYPE = 'm1.small'

conn = boto.connect_emr(AWSKEY,SECRETKEY)

bootstrap_step = BootstrapAction("installmono", "s3://" + S3_BUCKET + "/monobootstrap.sh",None)

step = StreamingStep(name='Wordcount',
                     mapper='s3n://' + S3_BUCKET + '/csharpmapper',
                     reducer='s3n://' + S3_BUCKET + '/fsharpreducer',
                     input='s3n://elasticmapreduce/samples/wordcount/input',
                     output='s3n://' + S3_BUCKET + '/output')

jobid = conn.run_jobflow(
    name="emr with fsharp and csharp", 
    log_uri="s3://" + S3_BUCKET + "/logs", 
    steps = [step],
    bootstrap_actions=[bootstrap_step],
    num_instances=NUM_INSTANCES,
    slave_instance_type=SLAVE_INSTANCE_TYPE,
    master_instance_type=MASTER_INSTANCE_TYPE,
    enable_debugging=True
)

print "finished spawning job (note: starting still takes time)"

state = conn.describe_jobflow(jobid).state
print "job state = ", state
print "job id = ", jobid
while state != u'COMPLETED':
    print time.localtime()
    time.sleep(30)
    state = conn.describe_jobflow(jobid).state
    print "job state = ", state
    print "job id = ", jobid

print "final output can be found in s3://" + S3_BUCKET + "/output" 
print "try: $ s3cmd sync s3://" + S3_BUCKET + "/output" + TIMESTAMP + " ."

In order to run the python deploy job you need to install boto, update constants in top of file for AWS keys and S3 bucket and finally start it with python deploy.py

Conclusion
This posting has given an example of a simple – wordcount in C# and F# with how to deploy the job on Amazon’s Elastic Mapreduce. It has been tested on up 10 m1.large ec2 nodes. Code can checked out with git clone git@github.com:atbrox/atbroxexamples.git.

Atbrox on LinkedIn

Best regards,
Amund Tveit (amund atbrox.com)
Atbrox

Digg This
Reddit This
Stumble Now!
Buzz This
Vote on DZone
Share on Facebook
Bookmark this on Delicious
Kick It on DotNetKicks.com
Shout it
Share on LinkedIn
Bookmark this on Technorati
Post on Twitter
Google Buzz (aka. Google Reader)
Tagged with:
Jan 06

If you are interested in Mapreduce (/Hadoop), I would like to recommend participating or submitting your paper to the 2nd International Workshop on Mapreduce and its applications (MAPREDUCE’11). The mapreduce workshop is held in correspondance with the The 20th International ACM Symposium on High-Performance Parallel and Distributed Computing in San Jose, California (June 2011).

(I just joined the mapreduce workshop as a program committee member)

Best regards,

Amund Tveit

Digg This
Reddit This
Stumble Now!
Buzz This
Vote on DZone
Share on Facebook
Bookmark this on Delicious
Kick It on DotNetKicks.com
Shout it
Share on LinkedIn
Bookmark this on Technorati
Post on Twitter
Google Buzz (aka. Google Reader)
Tagged with:
preload preload preload