How to combine Elastic Mapreduce/Hadoop with other Amazon Web Services

Elastic Mapreduce default behavior is to read from and store to S3. When you need to access other AWS services, e.g. SQS queues or database services SimpleDB and RDS (MySQL) the best approach from Python is to use Boto. To get Boto to work with Elastic Mapreduce you need to dynamically load boto on each mapper and reducer, Cloudera’s Jeff Hammerbacher outlined how to do that using Hadoop Distributed Cache and Peter Skomorroch suggested how to load Boto to access Elastic Blockstore (EBS), this posting is based on those ideas and gives a detailed description how to do it.

How to combine Elastic Mapreduce with other AWS Services

This posting shows how to load boto in an Elastic Mapreduce mapper and gives a simple example how to use simpledb from the same mapper. For accessing other AWS services, e.g. SQS from Elastic Mapreduce check out the Boto documentation (it is quite easy when the boto + emr integration is in place).

Other tools used (prerequisites):

Step 1 – getting and preparing the Boto library
[code]
wget http://boto.googlecode.com/files/boto-1.8d.tar.gz
# note: using virtualenv can be useful if you want to
# keep your local Python installation clean
tar -zxvf boto-1.8d.tar.gz ; cd boto-1.8d ; python setup.py install
cd /usr/local/lib/python2.6/dist-packages/boto-1.8d-py2.6.egg
zip -r boto.mod boto
[/code]

Step 2 – mapper that loads boto.mod and uses it to access SimpleDB
[python]
# this was tested by adding code underneath to the mapper
# s3://elasticmapreduce/samples/wordcount/wordSplitter.py

# get boto library
sys.path.append(".")
import zipimport
importer = zipimport.zipimporter(‘boto.mod’)
boto = importer.load_module(‘boto’)

# access simpledb
sdb = boto.connect_sdb("YourAWSKey", "YourSecretAWSKey")
sdb_domain = boto.create_domain("mymapreducedomain") # or get_domain()
# ..
# write words to simpledb
for word in pattern.findall(line):
item = sdb_domain.create_item(word)
item["reversedword"] = word[::-1]
item.save()
# …
[/python]

Step 3 – json config file – bototest.json – for Elastic Mapreduce Ruby Client
[code]
[
{
"Name": "Step 1: testing boto with elastic mapreduce",
"ActionOnFailure": "<action_on_failure>",
"HadoopJarStep": {
"Jar": "/home/hadoop/contrib/streaming/hadoop-0.18-streaming.jar",
"Args": [
"-input", "s3n://elasticmapreduce/samples/wordcount/input",
"-output", "s3n://yours3bucket/result",
"-mapper", "s3://yours3bucket/botoWordSplitter.py",
"-cacheFile", "s3n://yours3bucket/boto.mod#boto.mod",
]
}
}
]
[/code]

Step 4 – Copy necessary files to s3
[code]
s3cmd put boto.mod s3://yours3bucket
s3cmd put botoWordSplitter.py s3://yours3bucket
[/code]

Step 5 – And run your Elastic Mapreduce job
[code]
elastic-mapreduce –create \
–stream \
–json bototest.json \
–param "<action_on_failure>=TERMINATE_JOB_FLOW"

[/code]

Conclusion
This showed how to dynamically load boto and use it to access one other AWS service – SimpleDB – from Elastic Mapreduce. Boto supports most AWS services, so the same integration approach should work also for other AWS services, e.g. SQS (Queuing Service), RDS (MySQL Service) and EC2, check out the Boto API documentation or Controlling the Cloud with Python for details.

Note: a very similar integration approach should work for most Python libraries, also those that use/wrap C/C++ code (e.g. machine learning libraries such as PyML and others), but then it might be needed to do step 1 on Debian AMIs similar to what Elastic Mapreduce is using, check out a previous posting for more info about such AMIs.


Do you need help with Hadoop/Mapreduce?
A good start could be to read this book, or contact Atbrox if you need help with development or parallelization of algorithms for Hadoop/Mapreduce – info@atbrox.com. See our posting for an example parallelizing and implementing a machine learning algorithm for Hadoop/Mapreduce

Posted in cloud computing, Hadoop and Mapreduce, infrastructure | Tagged , , , , , , | 4 Comments

Preliminary Experiences Crawling with 80legs

80legs is a company specializing in the crawling and preprocessing part of search, where you can upload your seed urls (where to start crawling), configure your crawl job (depth, domain restrictions etc.) and also run existing or custom analysis code (upload java jar-files) on the fetched pages. When you upload seed files 80legs does some filtering before starting to crawl (e.g. if you have seed urls which are not well-formed), and also handles domain throttling and robots.txt (and perhaps other things).

Computational model: Since you can run custom code per page it can be seen as a mapper part of a MapReduce (Hadoop) job (one map() call per page); for reduce-type processing (over several pages) you need to move your data elsewhere (e.g. EC2 in the cloud). Side note: another domain with “reduce-less” mapreduce is quantum computing, check out Michael Nilsen’s Quantum Computing for Everyone.

Testing 80legs

Note: We have only tried with the built-in functionality and no custom code so far.

1) URL extraction

Job description: We used a seed of approximately 1,000 URLs and crawled and analyzed ~2.8 million pages within those domains. The regexp configuration was used (we only provided the URL matching regexp).

Result: Approximately 1 billion URLs were found, and results came in 106 zip-files (each ~14MB packed and ~100MB unpacked) in addition to zip files of the URLs that where crawled.

Note: Based on a few smaller similar jobs it looks like the parallelism of 80legs is somewhat dependent of the number of domains in the crawl and perhaps also on their ordering. In case you have a set of URLs where each domain has more than one URL it can be useful to randomize your seed URL file before uploading and running the crawl job, e.g. by using rl or coreutil’s shuf.

2) Fetching pages

Job description: We built a set of URLs – ~80k URLs that we wanted to fetch as html (using their sample application called 80App Get Raw HTML) for further processing. The URLs were split into 4 jobs of ~20k URLs each.

Result: Each job took roughly one hour (they all ran in parallel so the total time spent was 1 hour). We ended up with 5 zip files per job, each zip file having ~25MB of data (100MB unpacked), i.e. ~4*5*100MB = 2GB raw html when unpacked for all jobs.

Conclusion

80legs is an interesting service that has already proved useful for us, and we will continue to use it in combination with AWS and EC2. Custom code needs to be built (e.g. related to ajax crawling).

(May 2000 – A few thoughts about the future of Internet Information Retrieval)

Atbrox on LinkedIn

Best regards,
Amund Tveit, co-founder of Atbrox

Posted in cloud computing | Tagged , , , , , | 3 Comments

Unstructured Search for Amazon’s SimpleDB

SimpleDB is a service primarily for storing and querying structured data (can e.g. be used for  a product catalog with descriptive features per products, or an academic event service with extracted features such as event dates, locations, organizers and topics). (If one wants “heavier data” in SimpleDB, e.g. video or images, a good approach be to add paths to Hadoop DFS or S3 objects in the attributes instead of storing them directly)

Unstructured Search for SimpleDB

This posting presents an approach of how to add (flexible) unstructured search support to SimpleDB (with some preliminary query latency numbers below – and very preliminary python code). The motivation is:
  1. Support unstructured search with very low maintenance
  2. Combine structured and unstructured search
  3. Figure out the feasibility of unstructured search on top of SimpleDB

The Structure of SimpleDB

SimpleDB is roughly a persistent hashtable of hashtables, where each row (a named item in the outer hashtable)  has another hashtable with up to 256 key-value pairs (called attributes). The attributes can be 1024 bytes each, so 256 kilobyte totally in the values per row (note: twice that amount if you store data also as part of the keys + 1024 bytes in the item name). Check out Wikipedia for detailed SimpleDB storage characteristics.

Inverted files

Inverted files is a common way of representing indices for unstructured search. In their basic form they (logically) contain a word with a list of pages or files the word occurs on. When a query comes one looks up in the inverted file and finds pages or files where the words in the query occur. (note: if you are curious about inverted file representation check out the survey – Inverted files for text search engines)

One way of representing inverted files on SimpleDB is to map the inverted file on top of the attributes, i.e. have one SimpleDB domain with one word (term), and let the attributes store the list of URLs containing that word. Since each URL contains many words, it can be useful to have a separate SimpleDB domain containing a mapping from hash of URL to URL and use the hash URL in the inverted file (keeps the inverted file smaller). In the draft code we created 250 key-value attributes where each key was a string from “0” to “249” and each corresponding value contained hash of URLs (and positions of term) joined with two different string separators. If too little space per item – e.g. for stop words – one could “wrap” the inverted file entry with adding the same term combined with an incremental postfix (note: if that also gave too little space one could also wrap on simpledb domains).

Preliminary query latency results

Warning: Data sets used were  NLTK‘s inaugural collection, so far from the biggest.

Inverted File Entry Fetch latency Distribution (in seconds)

Conclusion: the results from 1000 fetches of inverted file entries are relatively stable clustered around 0.020s (20 milliseconds), which are promising enough to pursue further (but still early to decide given only tests on small data sets so far). Balancing with using e.g. memcached could be also be explored, in order to get average fetch time even lower.

Preliminary Python code including timing results (this was run on an Fedora large EC2 node somewhere in a US east coast data center).

Posted in cloud computing | Tagged , , , , , , , , , , | 2 Comments

How to use C++ Compiled Python for Amazon’s Elastic Mapreduce (Hadoop)

Sometimes it can be useful to compile Python code for Amazon’s Elastic Mapreduce into C++ and then into a binary. The motivation for that could be to integrate with (existing) C or C++ code, or increase performance for CPU-intensive mapper or reducer methods. Here follows a description how to do that:

  1. Start a small EC2 node with AMI similar to the one Elastic Mapreduce is using (Debian Lenny Linux)
  2. Skim quickly through the Shedskin tutorial
  3. Log into the EC2 node and install the Shedskin Python compiler
  4. Write your Python mapper or reducer program and compile it into C++ with Shedskin
    • E.g. the commandpython ss.py mapper.py – would generate C++ files mapper.hpp and mapper.cpp, a Makefile and an annotated Python file mapper.ss.py.
  5. Optionally update the C++ code generated by Shedskin to use other C or C++ libraries
    • note: with Fortran-to-C you can probably integrate your Python code with existing Fortran code (e.g. numerical/high performance computing libraries). Similar for Cobol (e.g. in financial industry) with OpenCobol (compiling Cobol into C). Please let us know if you try or need help with help that.
  6. Add -static as the first CCFLAGS parameter in the generated Makefile to make it a static executable
  7. Compile the C++ code into a binary with make and check that you don’t get a dynamic executable with ldd (you want a static executable)
  8. Run strip on the binary to make it smaller
  9. Upload your (ready) binary to a chosen location in Amazon S3
  10. Read Elastic Mapreduce Documentation on how to use the binary to run Elastic Mapreduce jobs.

Note: if you skip the shedskin-related steps this approach would also work if you are looking for how to use C or C++ mappers or reducers with Elastic Mapreduce.

Note: this approach should probably work also with Cloudera’s distribution for Hadoop.


Do you need help with Hadoop/Mapreduce?
A good start could be to read this book, or

contact Atbrox if you need help with development or parallelization of algorithms for Hadoop/Mapreduce – info@atbrox.com. See our posting for an example parallelizing and implementing a machine learning algorithm for Hadoop/Mapreduce
Posted in cloud computing | Tagged , , , , , , , | 7 Comments

Hadoop World 2009 – some notes from application session

Other recommended writeups :

Location: Roosevelt Hotel, NYC

1235 Joe Cunningham – Visa – Large scale transaction analysis
– responsible for Visa Technology Strategy and Innovation
been playing with Hadoop for 9 months
probably many in audience learning and starting out with Hadoop

Agenda:
1) VisaNet overview
2) Value-added information products
3) Hadoop@Visa – research results

About Visa:
– 60 Billion market cap
– well-known card products, and also behind the scene information products
– Visa brand has high trust
– For a card-holder a Visa-card means global acceptance
– For a shopowner, if you get a Visa payment aproval you will be payed

VisaNet
VisaNet is the largest, most advanced payment network in the world
characteristics:
28M locations,
130M authorizations/day,
1500 endpoints,
Processes transactions faster than 1s
1.4M ATMs,
Processes in 175 currencies,
Less than 2s unavailability per year (!)
– according to my calculations six 9s (0.999999366)
16300 financial institutions

Visa Processing Architecture
Security/Access Services -> Message|File|Web
VisaNet Services Integration -> Authorization|Clearing&Settlement
Dispute handling, Risk, Information
Scoring every transaction (used for issuer to approve/decline transaction)

Value added Info products
– Info services
Client: Portfolio Analysis, Visa Incentive Network
Accountholder: transaction alerts, accoutnt updater, tailored rewards
– Risk management services
Account monitoring
Authentication
Encyption

Hadoop@Visa
Run a pipeline of prototypes in lab facility in SF
Any technology taken into Visa needs to match scalability and reliability requirements

Research Lab Setup
– VM System:
Custom Analytic Stacks
Encryption Processing
Relational Database
– Hadoop Systems
Management Stack
Hadoop #1 ~40TB / 42 nodes (2 years of raw transaction data)
Hadoop #2 ~300TB / 28 nodes

Risk Product Use Case
Create critical data model elements, such as keys and transaction statistics, which feed our real-time risk-scoring systems
Input: Transactions – Merchant Category, Country/Zip
Output: Key & Statistics – MCCZIP Key – stats related to account, trans. type, approval, fraud, IP address etc.
Research Sample: 500M distinct accounts, 100M transactions per day, 200 bytes per transaction, 2 years – 73B transaction (36TB)
Processing time from 1 month to 13 minutes! (note: ~3000 times faster)
(Generate synthetic transactions used to test the model)

Financial Enterprise Fit
– key questions under research:
– what will the Hadoop Solution Stack(s) look like?
– File system, Transaction Sample System, Relational Back-end (integration path), Analytics Processing
– Internal vs external cloud
– How do I get data into a cloud in a secure way.
– How does HSM and security integration work in Hadoop
– What are the missing pieces?

Why Hadoop@Visa?
– analyze volumes of data with response that are not possible today
– requirement: need to fit with existing solutions

Cross Data Center Log Processing – Stu Hood, Rackspace

(Email and apps division, work on search team)

Agenda
Use Case Backgound
– “Rackapps” – Hybrid Mail Hosting, 40% use a mix of exchange and rackspace mail

Use Case: Log Types

Use Case: Querying
– was the mail delivered?
– spam – why was it (not) marked as spam
– access – who checked/failed to check mail?
more advanced questions:
– which delivery routes have the highest latency?
– which are the spammiest IP?
– Where in the world do customers log in from
Elsewhere:
– billing

Previous Solutions
– 1999-2006 – go to where log files are generated, querying with grep
– 2006-2007 / bulk load to MySQL – worked for a year

Hadoop Solution
– V3 – lucene indexes in Hadoop
– 2007- present
– store 7 days uncompressed
– queries take seconds
– long term queries with mapreduce (6M avail for MR queries)
– all 3 datacenters

Alternatives considered:
– Splunk – good for realtime, but not great for archiving
– Data warehouse package – not realtime, but fantastic for longterm analysis
– Partioned MySQL – half-baked solution
=> Hadoop hit the sweet spot

Hadoop Implementation
SW
– collect data using syslog-ng (considering Scribe)
– storage: deposits into Hadoop (scribe will remove that)
HW
– 2-4 collector machines per datacenters
– hundreds of source machines
20 solr nodes

Implementation: Indexing/Querying
– indexing – uniqe processing code for schema
– querying
– “realtime”
– sharded lucene/solr instances merge-index chunk from Hadoop
– using Solr-API
– raw logs
– using Hadoop Streaming and unix grep
– Mapreduce

Implementation: Timeframe
– development – 1.5 people in 3 months
– deployments – using clouderas distribution
– roadblocks – bumped into job-size limits

Have run close to 1 million jobs on our cluster, and it has not gone down (except for other reasons such as maintenance)

Advantages – storage
– all storage in one place
Raw logs: 3 days, in HDFS
Indexes: 7 days
Archived Indexes: 6 months

Advantages – analysis
– Java Mapreduce API
– Apache Pig
– ideal for one-off queries
– Hadoop Streaming

Pig Example – whitehouse.gov mail spoofing

Advantages – Scalability, Cost, Community
– scalability – easy to add nodes
– cost – only hardware
– community – cloudera has been a benefit, deployment is trivial

Data Processing for Financial Services – Peter Krey and Sin Lee, JP Morgan Chase

Innovation & Shared Services, Firmwide Engineering & Architecture

note: certain constraints what can be shared due to regulations

JPMorgen Chase + Open Source
– QPD (AMQP) – top level apache project
– Tyger – Apache + Tomcat + Spring

Hadoop in the Enterprise – Economics Driven
– attractive: economics
– Many big lessons from Web 2.0 community
– Potential for Large Capex and Opex “Dislocation”
– reduce consumption of enterprise premium resources
– grid computing economics brought to data intensive computing
– stagnant data innovation
– Enabling & potentially disruptive platform
– many historical similarities
– java, linux, tomcat, web/internet
– minis to client/server, client/server to web, solaris to linux, ..
– Key question: what can be built on top of Hadoop?
Back to economics driven – very cost-effective

Hadoop in the Enterprise – Choice Driven
– Overuse of relational database containers
– institutional “Muscle memory” – not too much else to choose from
– increasingly large percentage of static data stored in proprietary transactional DBs
– Over-Normalized Schemas: still Makes sense with cheap compute&storage?

– Enterprise Storage “Prisoners”
– Captive to the economics & technology of “a few” vendors
– Developers need more choice
– Too much proprietary, single-source data infrastructure
– increasing need for minimal/no systems + storage admins

Hadoop in the Enterprise – Other Drivers
– Growing developer interest in “Reduced RDBMS” Data technologies
– open source, distributed, non-relational databases
– growing influence of web 2.0 technologies & thinking of enterprise
– hadoop, cassandra, hbase, hive, couchdb, hadoopDB, .. , others
– memcached for caching

FSI Industry Drivers
– Increased regularity oversight + reporting = More data needed over longer period of time
– triple data amounts from 2007 to 2009
– growing need for less expensive data repository/store
– increased need to support “one off” analysis on large data

Active POC Pipeline
– Growing stream of real projects to gauge hadoop “goodness of fit”
– broad spectrum of use cases
– driven by need to impact/dislocate OPEX+CAPEX
– looking for orders of magnitude
– evaluated on metric based performance, functional and economic measures
– avoid the “data falling on the floor phenomena”
– tools are really really important, keep tools and programming models simple

Hadoop Positiong
– Latency x Storage amount curve,

Cost comparisons
– SAN vs Hadoop HDFS cost comparison (GB/month)
– Hadoop much cheaper

Hadoop Additions and Must Haves:
– Improves SQL Front-End Tool Interoperability
– Improved Security & ACL enforcement – Kerberos Integration
– Grow Developer Programming Model Skill Sets
– Improve Relational Container Integration & Interop for Data Archival
– Management & Monitoring Tools
– Improved Developer & Debugging Tools
– Reduce Latency via integration with open source data caching
– memcached – others
– Invitation to FSI or Enterprise roundtable

Protein Alignment – Paul Brown, Booz Allen

Biological information
– Body – Cells – Chromosomes – Gene – DNA/RNA

Bioinformatics – The Pain
– too much data

So What? Querying a database of sequences for similar sequences
– one-to-many comparison
– 58000 proteins in PDB
– Protein alignment frequently used in the development of medicines
– Looking for a certain sequence across species, helps indicate function
Implementation in Hadoop
– distribute database sequence accross each node
– send query seq. inside Mapreduce (or dist.cache)
– scales well
– existing algorithms port easily

So What? Comparing sequences in bulk
– many-to-many
– DNA hybridiation (reconstruction)
Ran on AWS
Hadoop:
– if whole dataset fit into one computer
– Used distributed cache, assign each node a piece of the list
– But if the does not fit on one computer….
– pre-join all possible pairs with one MapReduce

So What? Analyzing really big sequences
– one big sequence to many small sequences
– scanning dna for structure
– population genetics
– hadoop implementatoin

Demonstration Implementation: Smith-Waterman Alignment
– one of the more computationally intensive matching and aligmnent techniques
– big matrix – (sequences to compare on row and column and calculations within)

Amazon implementation
– 250 machines
– E2
– run in 10 minutes for a single sequence. Runs in 24hrs for NxN comparison
– cost $40/hr

==> very cool 3D video of amazon ec2 nodes
– failing job due to 10% of nodes stuck on something (e.g. very long sequences)

Real-time Business Intelligence, Bradford Stephens

Topics
– Scalability and BI
– Costs and Abilities
– Search as BI

Tools: Zookeeper, Hbase, Katta (dist.search on Hadoop) and Bobo (faceted search for lucene)
– http://sourceforge.net/projects/bobo-browse/
– http://sourceforge.net/projects/katta/develop

100TB structured and unstructed data – Oracle 100M$, Hadoop and Katta 250K$

Building data cubes in real time (with faceted search)

Real-time Mapreduce on HBase
Search/BI as a platform – “google my datawarehouse”

Counting, Clustering and other data tricks, Derek Gottfried, New York Times

back in 2007 – would like to try as many EC2 instances as possible

Problem
– freeing up historical archives of NYTimes.com (1851-1922)
(in the public domain)

Currently:
– 2009 – web analytics
3 big data buckets:
1) registration/demographics
2) articles 1851-today
– a lot of metadata about each article
– unique data, extract people, places, .. to each article => high precision search
3) usage data/web logs
– biggest piece – piles up

How do we merge the 3 datasets?

Using EC2 – 20 machines
Hadoop 0.20.0
12 TB of data
Straight MR in Java
(mostly java + postprocessing in python)

combining weblog data with demographic data, e.g. twitter clicks backs by age group


Do you need help with Hadoop/Mapreduce?
A good start could be to read this book, or contact Atbrox if you need help with development or parallelization of algorithms for Hadoop/Mapreduce – info@atbrox.com. See our posting for an example parallelizing and implementing a machine learning algorithm for Hadoop/Mapreduce

Posted in cloud computing | Tagged , , , , | 1 Comment