Feb 25

There are currently interesting developments of scalable (up to Petabytes), low-latency and affordable datawarehouse related solutions, e.g.

  1. AWS Redshift (cloud-based) [1]
  2. Cloudera’s Impala (open source) [2,3]
  3. Apache Thrill (open source) [4]

This posting shows how one of them – AWS Redshift – can be combined with Hadoop/Elastic mapreduce for processing of semi/unstructured data.

1. Processing of structured vs unstructured/semistructured data



A good gold mine has 8-10 grams of gold per ton of gold ore (i.e. 0.008-0.01%), the amount of structured (“gold”) vs unstructured data (“gold ore”) is not that dissimilar (probably between 0.01-10% in many cases)

What is common for the solutions above is that they are primarily targeted towards efficient processing of structured data – as opposed to un/semi-structured data. This posting gives a simple integration example of how Elastic Mapreduce/Hadoop can be used to preprocess data into structured data that can be easily imported into and analyzed with AWS Redshift.

In the general case – and not the simplistic json data used in this example – Mapreduce algorithms could be used to process any type of input un/semi-structured data (e.g. video, audio, images and text) and where fit produce structured data that can be imported into Redshift. See my O’Reilly Strata Presentation – Mapreduce Algorithms – for more examples/pointers about capabilities of Mapreduce [5].

2. Processing input data with Elastic Mapreduce/Hadoop and import results to Redshift

The input data used in this example is parts of the the del.icio.us bookmarking data set collected (crawled) by Arvind Naraynanan (CS Professor at University of Princeton) [6,7]. Since the main purpose of this is to show integration between Mapreduce and Redshift the example is rather simple:

  1. the mapper function processes individual json del.icio.us records and produces records that contains some basic stats about tag lengths used in bookmarks,
  2. the reducer just writes out the results as tab-separated files on AWS S3.
  3. Finally the Mapreduce output is imported into AWS Redshift where further query-based analytics can begin.

3. Example input JSON record

{
    "author": "linooliveira",
    "comments": "http://delicious.com/url/0001c173b0f84ea81d188336223f9d7d",
    "guidislink": false,
    "id": "http://delicious.com/url/0001c173b0f84ea81d188336223f9d7d#linooliveira",
    "link": "http://www.amadeus.net/plnext/meb/HomePageDispatcher.action?SITE=BCEUBCEU&LANGUAGE=GB",
    "links": [
        {
            "href": "http://www.amadeus.net/plnext/meb/HomePageDispatcher.action?SITE=BCEUBCEU&LANGUAGE=GB",
            "rel": "alternate",
            "type": "text/html"
        }
    ],
    "source": {},
    "tags": [
        {
            "label": null,
            "scheme": "http://delicious.com/linooliveira/",
            "term": "trips"
        },
        {
            "label": null,
            "scheme": "http://delicious.com/linooliveira/",
            "term": "howto"
        },
        {
            "label": null,
            "scheme": "http://delicious.com/linooliveira/",
            "term": "tips"
        },
        {
            "label": null,
            "scheme": "http://delicious.com/linooliveira/",
            "term": "viagens"
        }
    ],
    "title": "Flight Times, Flight Schedules, Best fares, Best rates, Hotel Rooms, Car Rental, Travel Guides, Trip Planning - Amadeus.net",
    "title_detail": {
        "base": "http://feeds.delicious.com/v2/rss/recent?min=1&count=100",
        "language": null,
        "type": "text/plain",
        "value": "Flight Times, Flight Schedules, Best fares, Best rates, Hotel Rooms, Car Rental, Travel Guides, Trip Planning - Amadeus.net"
    },
    "updated": "Sun, 06 Sep 2009 11:36:20 +0000",
    "wfw_commentrss": "http://feeds.delicious.com/v2/rss/url/0001c173b0f84ea81d188336223f9d7d"
}

4. Example of output TSV record produced by Mapreduce

# fields: id, weekday, month, year, hour, minute, second, num_tags, sum_tag_len, avg_tag_len, num_tags_with_len0,num_tags_with_len1,.., num_tags_with_len9


http://delicious.com/url/0001c173b0f84ea81d188336223f9d7d#linooliveira Sun Sep 2009 11 36 20 4 21.0 5.25 0 0 0 0 1 2 0 1 0 0

5. Elastic Mapreduce/Hadoop code in Python

Probably one of the easiest ways to use Elastic Mapreduce is to write the mapreduce code in Python using Yelp’s (excellent) mrjob [8]. And there are of course plenty of reasons to choose Python as the programming language, see [9-14].

from mrjob.job import MRJob
from mrjob.protocol import RawProtocol
import json
import sys
import logging

class PreprocessDeliciousJsonMapreduce(MRJob):
    INPUT_PROTOCOL = RawProtocol # mrjob trick 1
    OUTPUT_PROTOCOL = RawProtocol # mrjob trick 2

    def calc_tag_stats(self, jvalue):
        tag_len_freqs = {}
        num_tags = len(jvalue["tags"])
        sum_tag_len = 0.0
        for taginfo in jvalue["tags"]:
            tag_len = len(taginfo["term"])
            if tag_len < 10: # only keep short tags
                sum_tag_len += tag_len
                tag_len_freqs[tag_len] = tag_len_freqs.get(tag_len, 0) + 1
        for j in range(10):
            if not tag_len_freqs.has_key(j):
                tag_len_freqs[j] = 0 # fill in the blanks
        avg_tag_len = sum_tag_len / num_tags
        return avg_tag_len, num_tags, sum_tag_len, tag_len_freqs

    def get_date_parts(self, jvalue):
        (weekday, day, month, year, timestamp) = jvalue["updated"].replace(",", "").split(" ")[:5]
        (hour, minute, second) = timestamp.split(':')[:3]
        return hour, minute, month, second, weekday, year

    def mapper(self, key, value):
        try:
            jvalue = json.loads(key)
            if jvalue.has_key("tags"):
                avg_tag_len, num_tags, sum_tag_len, tag_len_freqs = self.calc_tag_stats(jvalue)
                hour, minute, month, second, weekday, year = self.get_date_parts(jvalue)

                out_data = [weekday, month, year, hour,minute,second, num_tags, sum_tag_len, avg_tag_len]

                for tag_len in sorted(tag_len_freqs.keys()):
                    out_data.append(tag_len_freqs[tag_len])

                str_out_data = [str(v) for v in out_data]

                self.increment_counter("mapper", "kept_entries", 1)

                yield jvalue["id"], "\t".join(str_out_data)
        except Exception, e:
            self.increment_counter("mapper", "skipped_entries", 1)
            logging.error(e)

    def reducer(self, key, values):
        for value in values:
            yield key, value

    def steps(self):
        return [self.mr(mapper=self.mapper,
                        reducer=self.reducer),]

if __name__ == '__main__':
    PreprocessDeliciousJsonMapreduce.run()

6. Running the Elastic Mapreduce job

Assuming you’ve uploaded the del.icio.us (or other) data set to s3, you can start the job like this (implicitly using mrjob)


#!/bin/bash

# TODO(READER): set these variables first
export AWS_ACCESS_KEY_ID=
export AWS_SECRET_ACCESS_KEY=
export INPUT_S3=”s3://somes3pathhere”
export LOG_S3=”s3://another3pathhere”
export OUTPUT_S3=”s3://someothers3pathhere”

nohup python mapreduce_delicious.py --ssh-tunnel-to-job-tracker --jobconf mapreduce.output.compress=true --ssh-tunnel-is-closed --ec2-instance-type=m1.small --no-output --enable-emr-debugging --ami-version=latest --s3-log-uri=${LOGS_S3} -o ${OUTPUT_S3} -r emr ${INPUT_S3} --num-ec2-instances=1 &

note: for larger data sets you probably want to use other instance types (e.g. c1.xlarge) and a higher number of instances.

7. Connecting, Creating Tables and Importing Mapreduce Data with AWS Redshift

There are several ways of creating and using a Redshift cluster, for this example I used the AWS Console [15], but for an automated approach using the Redshift API would be more approriate (e.g. with boto [16,17])



AWS Redshift Web Console

When you have created the cluster (and given access permissions to the machine you are accessing the Redshift cluster the from), you can access the Redshift cluster e.g. using a Postgresql Client – as below:

psql -d "[your-db-name]" -h "[your-redshift-cluster-host]" -p "[port-number]" -U "[user-name]"

and login with password and then you should be connected.

Creating table can e.g. be done with

CREATE TABLE deliciousdata (
       id varchar(255) not null distkey,
       weekday varchar(255),
       month varchar(255),
       year varchar(255),
       hour varchar(255),
       minute varchar(255),
       second varchar(255),
       num_tags varchar(255),
       sum_tag_len varchar(255),
       avg_tag_len varchar(255),
       num_tags_with_len0 varchar(255),
       num_tags_with_len1 varchar(255),
       num_tags_with_len2 varchar(255),
       num_tags_with_len3 varchar(255),
       num_tags_with_len4 varchar(255),
       num_tags_with_len5 varchar(255),
       num_tags_with_len6 varchar(255),
       num_tags_with_len7 varchar(255),
       num_tags_with_len8 varchar(255),
       num_tags_with_len9 varchar(255)
);

And data can be imported by substituting the values used the export statements earlier in the blog post (i.e. OUTPUT_S3, AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY) in the copy-command below.


copy deliciousdata from 'OUTPUT_S3/part-00000' CREDENTIALS 'aws_access_key_id=AWS_ACCESS_KEY_ID;aws_secret_access_key=AWS_SECRET_ACCESS_KEY' delimiter '\t';

8. Analytics with AWS Redshift

If everything went well, you should now be able to do SQL-queries on the data you produced with mapreduce now stored in Redshift, e.g.

select count(*) from deliciousdata;

Since this posting is about integration I leave this part as an exercise to the reader..

9. Conclusion

This posting has given an example how Elastic Mapreduce/Hadoop can produce structured data that can be imported into AWS Redshift datawarehouse.

Redshift Pricing Example
But since Redshift is a cloud-based solution (i.e. with more transparent pricing than one typically find in enterprise software) you probably wonder what it costs? If you sign up for a 3 year reserved plan with 16TB of storage (hs1.8xlarge), the efficient annual price per Terabyte is $999[1], but what does this mean? Back in 2009 Joe Cunningham from VISA disclosed[18] that they had 42 Terabytes that covered 2 years of raw transaction logs. if one assumes that they would run this on Redshift on 3 hs1.8xlarge instances on a 3 year reserved plan (with 3*16 = 48 TB available storage), the efficient price would be 48*999 = 47.9K$ per year. Since most companies probably have less amounts of structured data than VISA this amount is perhaps an upper bound for most companies?

For examples other Data Warehouse prices check out this blog post (covers HANA, Exadata, Teradata and Greenplum)[19]

Best regards,
Amund Tveit
Atbrox


A. References

[1] http://aws.typepad.com/aws/2012/11/amazon-redshift-the-new-aws-data-warehouse.html
[2] http://blog.cloudera.com/blog/2012/10/cloudera-impala-real-time-queries-in-apache-hadoop-for-real/
[3] https://github.com/cloudera/impala
[4] http://incubator.apache.org/drill/
[5] http://www.slideshare.net/amundtveit/mapreduce-algorithms
[6] http://randomwalker.info/
[7] http://arvindn.livejournal.com/116137.html
[8] https://github.com/Yelp/mrjob
[9] http://instagram-engineering.tumblr.com/post/13649370142/what-powers-instagram-hundreds-of-instances-dozens-of
[10] http://ontwik.com/python/disqus-scaling-the-world%E2%80%99s-largest-django-application/
[11] https://blog.brainsik.net/2009/why-reddit-uses-python
[12] http://www.quora.com/Why-did-Pinterest-founders-use-Python
[13] http://www.quora.com/Quora-Infrastructure/Why-did-Quora-choose-Python-for-its-development
[14] http://www.python.org/about/quotes/
[15] http://docs.aws.amazon.com/redshift/latest/gsg/redshift-gsg.pdf
[16] http://redshiftuser.wordpress.com/2013/01/07/using-boto-to-load-data-into-aws-redshift/
[17] http://docs.pythonboto.org/en/latest/
[18] http://atbrox.com/2009/10/03/hadoop-world-2009-notes-from-application-session/
[19] http://robklopp.wordpress.com/2012/11/15/priceperformance-of-hana-exadata-teradata-and-greenplum/
Digg This
Reddit This
Stumble Now!
Buzz This
Vote on DZone
Share on Facebook
Bookmark this on Delicious
Kick It on DotNetKicks.com
Shout it
Share on LinkedIn
Bookmark this on Technorati
Post on Twitter
Google Buzz (aka. Google Reader)
Tagged with:
May 16


It’s been a year since I updated the mapreduce algorithms posting last time, and it has been truly an excellent year for mapreduce and hadoop – the number of commercial vendors supporting it has multiplied, e.g. with 5 announcements at EMC World only last week (Greenplum, Mellanox, Datastax, NetApp, and Snaplogic) and today’s Datameer funding announcement , which benefits the mapreduce and hadoop ecosystem as a whole (even for small fish like us here in Atbrox). The work-horse in mapreduce is the algorithm, this update has added 35 new papers compared to the prior posting, new ones are marked with *. I’ve also added 2 new categories since the last update – astronomy and social networking.

Motivation
Learn from academic literature about how the mapreduce parallel model and hadoop implementation is used to solve algorithmic problems.

Which areas do the papers cover?

Author organizations and companies?
Companies: China Mobile, eBay, Google, Hewlett Packard and Intel, Microsoft, Wikipedia, Yahoo and Yandex.
Government Institutions and Universities: US National Security Agency (NSA)
, Carnegie Mellon University, TU Dresden, University of Pennsylvania, University of Central Florida, National University of Ireland, University of Missouri, University of Arizona, University of Glasgow, Berkeley University and National Tsing Hua University, University of California, Poznan University, Florida International University, Zhejiang University, Texas A&M University, University of California at Irvine, University of Illinois, Chinese Academy of Sciences, Vrije Universiteit, Engenharia University, State University of New York, Palacky University, University of Texas at Dallas

Atbrox on LinkedIn

Btw: I would like to recommend:

  1. Mapreduce bibliography maintained by (Cloudera co-founder) Jeff Hammerbacher
  2. (the excellent) book – Data-Intensive Text Processing with Mapreduce by (UMD’s/Twitter’s) Jimmy Lin and Christopher Dyer.

Let me know if you have input/corrections/feedback to this posting – amund @\h@ atbrox.com – or @atveit or @atbrox on twitter.

Best regards,
Amund Tveit (Atbrox co-founder)

Digg This
Reddit This
Stumble Now!
Buzz This
Vote on DZone
Share on Facebook
Bookmark this on Delicious
Kick It on DotNetKicks.com
Shout it
Share on LinkedIn
Bookmark this on Technorati
Post on Twitter
Google Buzz (aka. Google Reader)
Apr 09

Wrote about mapreduce in search in a presentation for next week.

(more up-to-date pdf version of the presentation)

Best regards,
Amund
Atbrox

Digg This
Reddit This
Stumble Now!
Buzz This
Vote on DZone
Share on Facebook
Bookmark this on Delicious
Kick It on DotNetKicks.com
Shout it
Share on LinkedIn
Bookmark this on Technorati
Post on Twitter
Google Buzz (aka. Google Reader)
Tagged with:
Oct 01

A while back I wrote about How to combine Elastic Mapreduce/Hadoop with other Amazon Web Services. This posting is a small update to that, showing how to deploy extra packages with Boto for Python. Note that Boto can deploy mappers and reducers in written any language supported by Elastic Mapreduce. In the example below (it can also be found on github – http://github.com/atbrox/atbroxexamples, i.e. check out with git clone git@github.com:atbrox/atbroxexamples.git)

Imports and connection to elastic mapreduce on AWS

 
#!/usr/bin/env python
import boto
import boto.emr
from boto.emr.step import StreamingStep
from boto.emr.bootstrap_action import BootstrapAction
import time

# set your aws keys and S3 bucket, e.g. from environment or .boto
AWSKEY= 
SECRETKEY= 
S3_BUCKET=
NUM_INSTANCES = 1

conn = boto.connect_emr(AWSKEY,SECRETKEY)

Bootstrap step being created
In this case a shell script from s3, note that this could contain sudo commands in order to do apt-get installs, e.g to install classic programming language packages like gfortran or open-cobol, or more modern languages like ghc6 (haskell), or any code, e.g. checking out latest version of a programming language (e.g. Google Go with hg clone -r release https://go.googlecode.com/hg/ $GOROOT) interpreter/compiler and compile it before using it in your mappers or reducers

bootstrap_step = BootstrapAction("download.tst", "s3://elasticmapreduce/bootstrap-actions/download.sh",None)

Create map and reduce processing step
Using cache_files also adds a python library available for import (the other way could be to do sudo easy_install boto in the bootstrap step, which would be easier since the boto module wouldn’t have to be unpacked manually in the python code, see my previous posting for details about unpacking). Note that the mapper and reducer could be any language as long as you either have compiled in or have installed an interpreter for it with the bootstrap step.

step = StreamingStep(
  name='Wordcount',
  mapper='s3n://elasticmapreduce/samples/wordcount/wordSplitter.py',
  cache_files = ["s3n://" + S3_BUCKET + "/boto.mod#boto.mod"],
  reducer='aggregate',
  input='s3n://elasticmapreduce/samples/wordcount/input',
  output='s3n://' + S3_BUCKET + '/output/wordcount_output')

jobid = conn.run_jobflow(
    name="testbootstrap", 
    log_uri="s3://" + S3_BUCKET + "/logs", 
    steps = [step],
    bootstrap_actions=[bootstrap_step],
    num_instances=NUM_INSTANCES)

Wait for job to start
This waits for the Elastic Mapreduce Job to start and prints out status, one of the statuses between starting and running being bootstrapping.

state = conn.describe_jobflow(jobid).state
print "job state = ", state
print "job id = ", jobid
while state != u'COMPLETED':
    print time.localtime()
    time.sleep(30)
    state = conn.describe_jobflow(jobid).state
    print "job state = ", state
    print "job id = ", jobid

print "final output can be found in s3://" + S3_BUCKET + "/output" + TIMESTAMP
print "try: $ s3cmd sync s3://" + S3_BUCKET + "/output" + TIMESTAMP + " ."

Validation of what really happened
One way to validate is to check that your mappers and reducers written in any language (i.e. for which compiler that you installed with bootstrap action), e.g. the classic mapreduce word count written in classic languages like Cobol or Fortran 95? The other way is to check the s3 logs, the log directory for an elastic mapreduce job has the following subdirectories:

daemons  jobs  node  steps  task-attempts

In the node directory, each EC2 instance used in the job has a directory, and underneath each of them there is a bootstrap_actions directory with the master.log and stderr, stdout and controller logs. In the case presented above bootstrap output is shown underneath.
stderr output

--2010-10-01 17:38:38--  http://elasticmapreduce.s3.amazonaws.com/bootstrap-actions/file.tar.gz
Resolving elasticmapreduce.s3.amazonaws.com... 72.21.214.39
Connecting to elasticmapreduce.s3.amazonaws.com|72.21.214.39|:80... connected.
HTTP request sent, awaiting response... 
  HTTP/1.1 200 OK
  x-amz-id-2: NezTUU9MIzPwo72lJWPYIMo2wwlbDGi1IpDbV/mO07Nca4VarSV8l7j/2ArmclCB
  x-amz-request-id: 3E71CC3323EC1189
  Date: Fri, 01 Oct 2010 17:38:39 GMT
  Last-Modified: Thu, 03 Jun 2010 01:57:13 GMT
  ETag: "47a007dae0ff192c166764259246388c"
  Content-Type: application/octet-stream
  Content-Length: 153
  Connection: keep-alive
  Server: AmazonS3
Length: 153 [application/octet-stream]
Saving to: `file.tar.gz'

     0K                                                       100% 24.3M=0s

2010-10-01 17:38:38 (24.3 MB/s) - `file.tar.gz' saved [153/153]

Controller

2010-10-01T17:38:35.141Z INFO Fetching file 's3://elasticmapreduce/bootstrap-actions/download.sh'
2010-10-01T17:38:38.411Z INFO Working dir /mnt/var/lib/bootstrap-actions/1
2010-10-01T17:38:38.411Z INFO Executing /mnt/var/lib/bootstrap-actions/1/download.sh
2010-10-01T17:38:38.936Z INFO Execution ended with ret val 0
2010-10-01T17:38:38.938Z INFO Execution succeeded

Conclusion
The posting has shown how to programmatically install packages (e.g. programming languages) to EC2 nodes running elastic mapreduce. Since elastic mapreduce in streaming mode supports any programming language this can make it easier to deploy and test mappers and reducers written in your favorite language, and even automate it. (Opens a few doors for parallelization of legacy code)

Atbrox on LinkedIn

Best regards,
Amund Tveit, co-founder of Atbrox

Digg This
Reddit This
Stumble Now!
Buzz This
Vote on DZone
Share on Facebook
Bookmark this on Delicious
Kick It on DotNetKicks.com
Shout it
Share on LinkedIn
Bookmark this on Technorati
Post on Twitter
Google Buzz (aka. Google Reader)
Tagged with:
Aug 31

If you are interested in Hadoop or Mapreduce, I would like to recommend participating or submitting your paper to the First International Workshop on Theory and Practice of Mapreduce (MAPRED’2010) (held in correspondance with the 2nd IEEE International Conference on Cloud Computing Technology and Science).

(I just joined the workshop as a program committee member)

Best regards,

Amund Tveit (co-founder of Atbrox)

Digg This
Reddit This
Stumble Now!
Buzz This
Vote on DZone
Share on Facebook
Bookmark this on Delicious
Kick It on DotNetKicks.com
Shout it
Share on LinkedIn
Bookmark this on Technorati
Post on Twitter
Google Buzz (aka. Google Reader)
Tagged with:
preload preload preload