Monthly Archives: June 2012

Implementing a recommender engine using Hadoop and Mahout

In continuation with my earlier blog, in this blog I will talk about how to implement a recommender engine using Mahout and Hadoop.

  • First a brief introduction about MapReduce and how some of the computational algorithm has to be re-written for taking advantage of parallel processing.
  • Second I will talk about Recommender Algorithm to be deduced to work in Mahout to take advantages of Hadoop.
  • Third, I will walk you thru the use case of how we can implement MapReduce Recommender engine with Hadoop and Mahout.

One of the fundamental principles where parallel computing can be taken advantage of is when each of the computers in a cluster can process same tasks independently without sharing any data among each other. Consider sort algorithm, they are not designed to work in a way were we chunk a large dataset and hand it off to many computer running in parallel and once the data is returned the large dataset is consolidated back. If we need to do sorting, we need to write algorithm to take advantage of MapReduce. Terasort algorithm is one of the sorting techniques which takes advantage of MapReduce.

Similarly in the context of recommender engine, if we need to take advantage of MapReduce we have to write it differently. Mahout supports recommendation algorithm to take advantage of MapReduce. Components of a Recommender engine are

  • User/Item based Recommender
  • Similarities (concept of neighborhood)

“Hello World” equivalent for recommendation problem is Netflix movie rating. The movie sample data is available here . In this test data you have a history of Users who rented Movies and rated these movies. When you rent a new movie, it will give you suggestions based on your renting pattern, as well as what other users with similar profiles are renting, this data is stored in transactional database like MySQL. We can write a batch program to the use the transaction data and move it to Hadoop filesystem. When you run Mahout MapReduce, it will return top ‘n’ recommendations. If you notice the data, it has comma separated userid,itemid,rating. It assumes you are maintaining users and items master tables in transactional database and you will combine these 2 to give the user more informative information.

Let me give you a glimpse of how an Item based recommendation algorithm works using MapReduce, for more on this refer Mahout in Action by Manning, chapter 6.

Recommender system involve

  • The first step using this data is to build an item co-occurrence matrix. It mainly answers a question as to how many times 2 items have co-occurred when the users are buying it. Which is basically as square matrix of the order of n where n is all the items the users have bought
  • The next step is to compute the user vector on what items he is buying. Which is a single column matrix
  • Final step is to produce the recommendation by multiplying the co-occurrence matrix and the user vector. The value to recommend to the user is the item with the highest of the zero value

Let us take a typical use case of airline reservation system. When a user is purchasing flight tickets for a location, he is offered recommendation relevant to his interest based on past purchase history and purchase history of similar users profiles. Assuming the transactional data is present in MySQL, we can use sqoop script to import the data into HDFS as below,

 sqoop import --connect jdbc:mysql://localhost/reservation --username krishna --password pass123  --table useritem
 

We need to run the mahout script as below in Hadoop

 hadoop jar mahout-core-0.6-job.jar org.apache.mahout.cf.taste.hadoop.item.RecommenderJob -Dmapred.input.dir=input/input.txt -Dmapred.output.dir=output --usersFile input/users.txt --booleanData --similarityClassname SIMILARITY_COOCCURRENCE
 

Finally we need to run sqoop script the export the final analytical data back to mysql as below,

 sqoop export --connect jdbc:mysql://localhost/reservation --username krishna --password pass123 --table useritem --export-dir useritem/part-m-00000
 

Now in your application, you do a data lookup to useritem table and return the Recommender results to the user.

Advertisements

Hadoop and Mahout in Data Mining

Introduction: Data Mining

This blog talks about a typical solution architecture where we use various BigData components like Hadoop, Mahout, Sqoop, Flume and Mongo-Hadoop Connector. Lot of successful companies have been around for atleast 20yrs and have invested in IT infrastructure. They have over a period of time, accumulated data in the form of transaction data, log archive and are eager to analyse these data and see if they can improve the business efficiency out of it.

Below is the Component Diagram of how each of the above frameworks fit into the ecosystem.

Data Mining

In a typical scenario, as the customer are using the IT systems of the company for buying companies products the customer data like , their purchasing patterns, their location, their Gender, how are other similar customers are purchasing are being stored for data mining. Based on this data, the company can helping customers make buying decisions. If you notice carefully Netflix, Amazon, eBay, Gilt are already doing this.

Mahout is one of the open source framework which has a built in Recommender engine. It also runs on Hadoop using Map/Reduce paradigm. In a typical scenario,

  1. User Item data that is stored in MySQL is Data Mining into Hadoop Distributed File System (HDFS) using a ETL tool like Sqoop
  2. User click information that is stored in Log files is exported into HDFS using log migration tools like Flume
  3. If the transaction data is stored in NoSQL, there are connectors to export the data to HDFS

Once the data is in HDFS, we can use Mahout Batch job to run the data analysis and import the processed data back to transactional database.

Hummingbird – real time data analytics

Recently I came across a interesting Company called Gilt which is one of the leading eCommerce company. What is interesting about it is, the whole company runs on real time analytics. Instead of I explaining what is real time analytics, click on the Hummingbird Slideshare presentation, Hummingbird Vimeo link, Hummingbird website  and Hummingbird Demo to understand more. The good news is they donated the code that runs this to Git here. Setting up Himmingbird is described in the Git location. The Hummingbird tracking pixel server by default starts in port 8000 and the Monitor server start in port 8080.

About Hummingbird

Hummingbird is build with Node.js and MongoDb. It works based on tracking pixel concept. Where in the framework will expose a 1×1 tracking pixel. You can embed this pixel in your html pages as below,

<img id="tracking-pixel" src="http://localhost:8000/tracking_pixel.gif?events=scAdd&productId=10&ip=12.232.43.121" alt="data analytics" />

And in the querysting of the image, pass whatever value you want to analyse, for example if you want know to how many times a product has been purchased from which location, the data is stored in MongoDb and later it can be analysed over a time range. The over all architecture is as below,

data analytics

Understanding document based MongoDb NoSQL

Introduction: MongoDb NoSQL

This article touches upon various aspects of RDBMS and where RDBMS hits the roadblock in an enterprise world. This article also talks about how we can overcome this road blocks by relaxing few key aspects of RDBMS. It also talks about how a NoSQL based solution fits into this and as a popular MongoDb NoSQL solution.

RDBMS related to enterprise scalabilities, performance and flexibilities

RDBMS evolved out of strong roots in math like Relational and Set theories. Some of the aspects are Schema validation, normalized data to avoid duplication, atomicity, locking, concurrency, high availability and one version of the truth.

While these aspects have lot of benefits in terms of data storage and retrieval, they can impact enterprise scalabilities, performance and flexibilities. Let us consider a typical purchase order example. In RDBMS world we will have 2 tables with one-to-many relationship as below,

Consider that we need to store huge amount of Purchase orders and we started partitioning, one of the ways to partition is to have OrderHeader table in one Db instance and LineItem information in another. And if you want to insert or Update an Order information, you need to update both the tables atomically and you need to have a transaction manager to ensure atomicity. If you want to scale this further in terms of processing and data storage, you can only increase hard disk space and RAM.

The easy way to achieve Scaling in RDBMS is Vertical Scaling

MongoDb NoSQL

Let us consider another situation, because of the change in our business we added a new column to the LineItem table called LineDesc. And imagine that this application was running in production. Once we deploy this change, we need to bring down the server and for some time to take effect this change.

Achieving enterprise scalabilities, performance and flexibility

Fundamental requirements of modern enterprise systems are,

  1. Flexibilities in terms of scaling database so that multiple instance of the database can process the information parallel
  2. Flexibilities in terms of changes to the database can be absorbed without long server downtimes
  3. Application /middle tier does not handle Object-relational impedance mismatch – Can we get away with it using techniques like JSON

Let us go back to our PurchaseOrder example and relax some of the aspects of RDBMS like normalization (avoid joins of lot of rows), atomicity and see if we can achieve some of the above objectives.

Below is an example of how we can store the PurchaseOrder (there are other better way of storing the information).

orderheader:{
orderdescription: “Krishna’s Orders”
date:"Sat Jul 24 2010 19:47:11 GMT-0700(PDT)",
lineitems:[
{linename:"pendrive", quantity:"5"}, {linename:"harddisk", quantity:"10"}
]
}

If you notice carefully, the purchase order is stored in a JSON document like structure. You also notice, we don’t need multiple tables, relationship and normalization and hence there is no need to join. And since the schema qualifiers are within the document, there is no table definition.

You can store them as collection of objects/documents. Hypothetically if we need to store several millions of PurchaseOrders, we can chunk them in groups and store them in several instances.

If you want to retrieve PurchaseOrders based on specific criteria, for example all the purchase orders in which one of the line item is a “pendrive”, we can ask all the individual instances to retrieve in “parallel” based on the same criteria and one of them can consolidate the list and return the information to the client. This is the concept of Horizontal Scaling

MongoDb NoSQL

Because the there is no separate Table schema and and the schema definition is included in the JSON object, we can change document structure and store and retrieve with just change in application layer. This does not need database restart.

Finally the object structure is JSON, we can directly present it to the web tier or mobile device and they will render it.

NoSQL is a classification of Database which is designed to keep the above aspects in mind.

MongoDb: Document based NoSQL

MongoDb NoSQL database is document based which is some of the above techniques to store and retrieve the data. There are few NoSQL databases that are Ordered Key Value based like Redis, Cassandra whichalso take these approaches but are much simpler.

If you have to give RDBMS analogy, Collection in MongoDb are similar to Tables, Document are similar to Rows. Internally MongoDb stores the information as Binary Serializable JSON objects called BSON. MongoDb support JavaScript style query syntax to retrieve BSON objects.

MongoDb NoSQL

Typical documents looks as below,

post={
author:“Hergé”,
date:new Date(),
text:“Destination Moon”,
tags:[“comic”,“adventure”] }

> db.post.save(post)

------------

>db.posts.find()  {

_id:ObjectId(" 4c4ba5c0672c685e5e8aabf3"),
author:"Hergé",
date:"Sat Jul 24 2010 19:47:11 GMT-0700(PDT)",
text:"Destination Moon",
tags:["comic","adventure"]
}

In MongoDb, atomicity is guaranteed within a document. If you have to achieve atomicity outside of the document, it has to be managed at the application level. Below is an example,

Many to many:

products:{
_id:ObjectId("10"),
name:"DestinationMoon",
category_ids:[ObjectId("20"),ObjectId("30”)]}
categories:{
_id:ObjectId("20"),
name:"adventure"}

//All products for a given category

>db.products.find({category_ids:ObjectId("20")})

//All categories for a given product
product=db.products.find(_id:some_id)

>db.categories.find({

_id:{$in:product.category_ids}
})

In a typical stack that uses MongoDb, it makes lot of sense to use a JavaScript based framework. A good web framework, we use Express/Node.js/MongoDb stack. A good example of how to use these stack is here.

MongoDb NoSQL also supports sharding which supports parallel processing/horizontal scaling. For more details on how a typical BigData handles parallel processing/horizontal scaling, refer Rickly Ho’s link

A typical use cases for MongoDb include, Event logging, Realtime Analytics, Content Management, Ecommerce. Use cases where it is not a good fit are Transaction base Banking system, Non Realtime Data warehousing

References: