There are currently interesting developments of scalable (up to Petabytes), low-latency and affordable datawarehouse related solutions, e.g.
- AWS Redshift (cloud-based) [1]
- Cloudera’s Impala (open source) [2,3]
- Apache Thrill (open source) [4]
This posting shows how one of them – AWS Redshift – can be combined with Hadoop/Elastic mapreduce for processing of semi/unstructured data.
1. Processing of structured vs unstructured/semistructured data
A good gold mine has 8-10 grams of gold per ton of gold ore (i.e. 0.008-0.01%), the amount of structured (“gold”) vs unstructured data (“gold ore”) is not that dissimilar (probably between 0.01-10% in many cases)
In the general case – and not the simplistic json data used in this example – Mapreduce algorithms could be used to process any type of input un/semi-structured data (e.g. video, audio, images and text) and where fit produce structured data that can be imported into Redshift. See my O’Reilly Strata Presentation – Mapreduce Algorithms – for more examples/pointers about capabilities of Mapreduce [5].
2. Processing input data with Elastic Mapreduce/Hadoop and import results to Redshift
The input data used in this example is parts of the the del.icio.us bookmarking data set collected (crawled) by Arvind Naraynanan (CS Professor at University of Princeton) [6,7]. Since the main purpose of this is to show integration between Mapreduce and Redshift the example is rather simple:
- the mapper function processes individual json del.icio.us records and produces records that contains some basic stats about tag lengths used in bookmarks,
- the reducer just writes out the results as tab-separated files on AWS S3.
- Finally the Mapreduce output is imported into AWS Redshift where further query-based analytics can begin.
3. Example input JSON record
{ "author": "linooliveira", "comments": "http://delicious.com/url/0001c173b0f84ea81d188336223f9d7d", "guidislink": false, "id": "http://delicious.com/url/0001c173b0f84ea81d188336223f9d7d#linooliveira", "link": "http://www.amadeus.net/plnext/meb/HomePageDispatcher.action?SITE=BCEUBCEU&LANGUAGE=GB", "links": [ { "href": "http://www.amadeus.net/plnext/meb/HomePageDispatcher.action?SITE=BCEUBCEU&LANGUAGE=GB", "rel": "alternate", "type": "text/html" } ], "source": {}, "tags": [ { "label": null, "scheme": "http://delicious.com/linooliveira/", "term": "trips" }, { "label": null, "scheme": "http://delicious.com/linooliveira/", "term": "howto" }, { "label": null, "scheme": "http://delicious.com/linooliveira/", "term": "tips" }, { "label": null, "scheme": "http://delicious.com/linooliveira/", "term": "viagens" } ], "title": "Flight Times, Flight Schedules, Best fares, Best rates, Hotel Rooms, Car Rental, Travel Guides, Trip Planning - Amadeus.net", "title_detail": { "base": "http://feeds.delicious.com/v2/rss/recent?min=1&count=100", "language": null, "type": "text/plain", "value": "Flight Times, Flight Schedules, Best fares, Best rates, Hotel Rooms, Car Rental, Travel Guides, Trip Planning - Amadeus.net" }, "updated": "Sun, 06 Sep 2009 11:36:20 +0000", "wfw_commentrss": "http://feeds.delicious.com/v2/rss/url/0001c173b0f84ea81d188336223f9d7d" }4. Example of output TSV record produced by Mapreduce
# fields: id, weekday, month, year, hour, minute, second, num_tags, sum_tag_len, avg_tag_len, num_tags_with_len0,num_tags_with_len1,.., num_tags_with_len9
http://delicious.com/url/0001c173b0f84ea81d188336223f9d7d#linooliveira Sun Sep 2009 11 36 20 4 21.0 5.25 0 0 0 0 1 2 0 1 0 0
5. Elastic Mapreduce/Hadoop code in Python
Probably one of the easiest ways to use Elastic Mapreduce is to write the mapreduce code in Python using Yelp’s (excellent) mrjob [8]. And there are of course plenty of reasons to choose Python as the programming language, see [9-14].
from mrjob.job import MRJob from mrjob.protocol import RawProtocol import json import sys import logging class PreprocessDeliciousJsonMapreduce(MRJob): INPUT_PROTOCOL = RawProtocol # mrjob trick 1 OUTPUT_PROTOCOL = RawProtocol # mrjob trick 2 def calc_tag_stats(self, jvalue): tag_len_freqs = {} num_tags = len(jvalue["tags"]) sum_tag_len = 0.0 for taginfo in jvalue["tags"]: tag_len = len(taginfo["term"]) if tag_len < 10: # only keep short tags sum_tag_len += tag_len tag_len_freqs[tag_len] = tag_len_freqs.get(tag_len, 0) + 1 for j in range(10): if not tag_len_freqs.has_key(j): tag_len_freqs[j] = 0 # fill in the blanks avg_tag_len = sum_tag_len / num_tags return avg_tag_len, num_tags, sum_tag_len, tag_len_freqs def get_date_parts(self, jvalue): (weekday, day, month, year, timestamp) = jvalue["updated"].replace(",", "").split(" ")[:5] (hour, minute, second) = timestamp.split(':')[:3] return hour, minute, month, second, weekday, year def mapper(self, key, value): try: jvalue = json.loads(key) if jvalue.has_key("tags"): avg_tag_len, num_tags, sum_tag_len, tag_len_freqs = self.calc_tag_stats(jvalue) hour, minute, month, second, weekday, year = self.get_date_parts(jvalue) out_data = [weekday, month, year, hour,minute,second, num_tags, sum_tag_len, avg_tag_len] for tag_len in sorted(tag_len_freqs.keys()): out_data.append(tag_len_freqs[tag_len]) str_out_data = [str(v) for v in out_data] self.increment_counter("mapper", "kept_entries", 1) yield jvalue["id"], "\t".join(str_out_data) except Exception, e: self.increment_counter("mapper", "skipped_entries", 1) logging.error(e) def reducer(self, key, values): for value in values: yield key, value def steps(self): return [self.mr(mapper=self.mapper, reducer=self.reducer),] if __name__ == '__main__': PreprocessDeliciousJsonMapreduce.run()6. Running the Elastic Mapreduce job
Assuming you’ve uploaded the del.icio.us (or other) data set to s3, you can start the job like this (implicitly using mrjob)
#!/bin/bash # TODO(READER): set these variables first export AWS_ACCESS_KEY_ID= export AWS_SECRET_ACCESS_KEY= export INPUT_S3=”s3://somes3pathhere” export LOG_S3=”s3://another3pathhere” export OUTPUT_S3=”s3://someothers3pathhere” nohup python mapreduce_delicious.py --ssh-tunnel-to-job-tracker --jobconf mapreduce.output.compress=true --ssh-tunnel-is-closed --ec2-instance-type=m1.small --no-output --enable-emr-debugging --ami-version=latest --s3-log-uri=${LOGS_S3} -o ${OUTPUT_S3} -r emr ${INPUT_S3} --num-ec2-instances=1 &note: for larger data sets you probably want to use other instance types (e.g. c1.xlarge) and a higher number of instances.
7. Connecting, Creating Tables and Importing Mapreduce Data with AWS Redshift
There are several ways of creating and using a Redshift cluster, for this example I used the AWS Console [15], but for an automated approach using the Redshift API would be more approriate (e.g. with boto [16,17])
When you have created the cluster (and given access permissions to the machine you are accessing the Redshift cluster the from), you can access the Redshift cluster e.g. using a Postgresql Client – as below:
AWS Redshift Web Consolepsql -d "[your-db-name]" -h "[your-redshift-cluster-host]" -p "[port-number]" -U "[user-name]"and login with password and then you should be connected.
Creating table can e.g. be done with
CREATE TABLE deliciousdata ( id varchar(255) not null distkey, weekday varchar(255), month varchar(255), year varchar(255), hour varchar(255), minute varchar(255), second varchar(255), num_tags varchar(255), sum_tag_len varchar(255), avg_tag_len varchar(255), num_tags_with_len0 varchar(255), num_tags_with_len1 varchar(255), num_tags_with_len2 varchar(255), num_tags_with_len3 varchar(255), num_tags_with_len4 varchar(255), num_tags_with_len5 varchar(255), num_tags_with_len6 varchar(255), num_tags_with_len7 varchar(255), num_tags_with_len8 varchar(255), num_tags_with_len9 varchar(255) );And data can be imported by substituting the values used the export statements earlier in the blog post (i.e. OUTPUT_S3, AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY) in the copy-command below.
copy deliciousdata from 'OUTPUT_S3/part-00000' CREDENTIALS 'aws_access_key_id=AWS_ACCESS_KEY_ID;aws_secret_access_key=AWS_SECRET_ACCESS_KEY' delimiter '\t';8. Analytics with AWS Redshift
If everything went well, you should now be able to do SQL-queries on the data you produced with mapreduce now stored in Redshift, e.g.
select count(*) from deliciousdata;Since this posting is about integration I leave this part as an exercise to the reader..
9. Conclusion
This posting has given an example how Elastic Mapreduce/Hadoop can produce structured data that can be imported into AWS Redshift datawarehouse.
Redshift Pricing Example
But since Redshift is a cloud-based solution (i.e. with more transparent pricing than one typically find in enterprise software) you probably wonder what it costs? If you sign up for a 3 year reserved plan with 16TB of storage (hs1.8xlarge), the efficient annual price per Terabyte is $999[1], but what does this mean? Back in 2009 Joe Cunningham from VISA disclosed[18] that they had 42 Terabytes that covered 2 years of raw transaction logs. if one assumes that they would run this on Redshift on 3 hs1.8xlarge instances on a 3 year reserved plan (with 3*16 = 48 TB available storage), the efficient price would be 48*999 = 47.9K$ per year. Since most companies probably have less amounts of structured data than VISA this amount is perhaps an upper bound for most companies?For examples other Data Warehouse prices check out this blog post (covers HANA, Exadata, Teradata and Greenplum)[19]
Best regards,
Amund Tveit
AtbroxA. References
[1] http://aws.typepad.com/aws/2012/11/amazon-redshift-the-new-aws-data-warehouse.html [2] http://blog.cloudera.com/blog/2012/10/cloudera-impala-real-time-queries-in-apache-hadoop-for-real/ [3] https://github.com/cloudera/impala [4] http://incubator.apache.org/drill/ [5] http://www.slideshare.net/amundtveit/mapreduce-algorithms [6] http://randomwalker.info/ [7] http://arvindn.livejournal.com/116137.html [8] https://github.com/Yelp/mrjob [9] http://instagram-engineering.tumblr.com/post/13649370142/what-powers-instagram-hundreds-of-instances-dozens-of [10] http://ontwik.com/python/disqus-scaling-the-world%E2%80%99s-largest-django-application/ [11] https://blog.brainsik.net/2009/why-reddit-uses-python [12] http://www.quora.com/Why-did-Pinterest-founders-use-Python [13] http://www.quora.com/Quora-Infrastructure/Why-did-Quora-choose-Python-for-its-development [14] http://www.python.org/about/quotes/ [15] http://docs.aws.amazon.com/redshift/latest/gsg/redshift-gsg.pdf [16] http://redshiftuser.wordpress.com/2013/01/07/using-boto-to-load-data-into-aws-redshift/ [17] http://docs.pythonboto.org/en/latest/ [18] https://atbrox.com/2009/10/03/hadoop-world-2009-notes-from-application-session/ [19] http://robklopp.wordpress.com/2012/11/15/priceperformance-of-hana-exadata-teradata-and-greenplum/
Pingback: Combining Hadoop/Elastic Mapreduce with AWS Redshift Data Warehouse | Big Data Press
Pingback: Why the new Amazon Linux AMI release matters for Modern Data Analytics platforms | Diary of a Data-Driven Product Manager