Category Archives: BigData

An insight on Big data

OLTP: Refers to Online Transaction Processing

It’s a bunch of programs that used to help and manage transactions (insert, update, delete and get) oriented applications. Most of the OLTP applications are faster because the database is designed using 3NF. OLTP systems are vertically scalable.

OLAP:  Refers to Online Analytical Processing

The OLAP is used for Business Analytics, Data Warehousing kind of transactions. The data processing is pretty slow because it enables users to analyze multidimensional data interactively from multiple data sources. The database schema used in OLAP applications are STAR (Facts and Dimensions, normalization is given a pass, redundancy is the order of the day).  OLAP systems are horizontally scaling.

OLTP vs. OLAP: There are some major difference between OLTP and OLAP.

Data Sharding:

Traditional way of database architecture implements vertical scaling that means splitting the table into number of columns and keeping them separately in physical or logically grouping (tree structure). This will lead into performance trouble when the data is growing. Need to increase the memory, CPU and disk space each and every time when we hit the performance problem.

To eliminate the above problem Data Sharding or Shared nothing concept is evolved, in which the database are scaled horizontally instead of vertically using the master/slave architecture by breaking the database into shards and spreading those into number of vertically scalable servers.

The Data Sharding concept is discussed detail in this link.

MPP (Massive Parallel Processing systems): Refers to the use of a large number of processors (or separate computers) to perform a set of coordinated computations in parallel.

MPP is also known as cluster computing or shared nothing architecture discussed above.

The examples for MPP are TeraData, GreenPlum.

Vertical and Horizontal Scaling:

Horizontal scaling means that you scale by adding more machines into your pool of resources where vertical scaling means that you scale by adding more power (CPU, RAM) to your existing machine.

In a database world horizontal-scaling is often based on partitioning of the data i.e. each node contains only part of the data, in vertical-scaling the data resides on a single node and scaling is done through multi-core i.e. spreading the load between the CPU and RAM resources of that machine.

With horizontal-scaling it is often easier to scale dynamically by adding more machines into the existing pool – Vertical-scaling is often limited to the capacity of a single machine.

Below are the differences between vertical and horizontal scaling.

CAP Theorem: The description for the CAP Theorem is discussed in this link.

Some insight on CAP:

Greenplum:  Greenplum Database is a massively parallel processing (MPP) database server based on PostgreSQL open-source technology. MPP (also known as shared nothing architecture) refers to systems with two or more processors which cooperate to carry out an operation – each processor with its own memory, operating system and disks.

The high-level overview of GreenPlum is discussed in this link.

Some Interesting Links:,

Hbase:  HBase is the Hadoop database. It is distributed, scalable, big data storage. It is used to provide real-time read and write access to large database which uses cluster (master/slave) architecture to store/retrieve data.

  • Hadoop is open source software developed by Apache to store large data.
    • MapReduce is used for distributed processing of large data sets on clusters (master/slave). MapReduce takes care of scheduling tasks, monitoring them and re-executing any failed tasks. The primary objective of Map/Reduce is to split the input data set into independent chunks and send to the cluster. The MapReduce sorts the outputs of the maps, which are then input to the reduce tasks. Typically, both the input and the output of the job are stored in a file system.
    • The Hadoop Distributed File System (HDFS) primary objective of HDFS is to store data consistently even in the presence of failures. HDFS uses a master/slave architecture in which one device (the master) controls one or more other devices (the slaves). The HDFS cluster consists of one or more slaves who actually contain the file system and a master server manages the file system namespace and regulates access to files.

Hbase documentation is provided in this link.

Some Interesting Links:

1. Do GreenPlum achieve MPP? Is GreenPlum uses Hadoop file system?

A)     GreenPlum VS MPP: GreenPlum follows MPP(Massive Parallel Processing) architecture. The architecture is discussed with the use case here.

B)      GreenPlum VS Hadoop: Yes, GreenPlum can use Hadoop internally. The use case is discussed detail in this link.

2. What are the different storage methodologies? Compare them.

Data Storage methodologies: There are three different methodologies, they are.

  • Row based storage
  • Column based storage
    • Difference between row and column based database are described in this link.
    • NoSQL : Is described detail in this link. It has three different concepts,
      • Key Value 
      • §  Document Store
      • §  Column Store

Some key points on different storage methodologies are,

Storage Methodologies Description Common Use Case Strength Weakness Size of DB Key Players
Row-based Data structured or stored in Rows. Used in transaction processing, interactive transaction applications. Robust, proven technology to capture intermediate transactions. Scalability and query processing time for huge data. Sybase, Oracle, My SQL, DB2 Sybase, Oracle, My SQL, DB2
Column-based Data is vertically partitioned and stored in Columns. Historical data analysis, data warehousing and business Intelligence. Faster query (specially ad-hoc queries) on large data. Not suitable for transaction, import export seep & heavy computing resource utilization. Several GB to 50 TB. Info Bright, Asterdata, Vertica, Sybase IQ, Paraccel
NoSQL-Key Value Stored Data stored in memory with some persistent backup. Used in cache for storing frequently requested data in applications. Scalable, faster retrieval of data , supports Unstructured and partial structured data. All data should fit to memory, does not support complex query. Several GBs to several TBs. Amazon S3, MemCached, Redis, Voldemort
NoSQL- Document Store Persistent storage of unstructured or semi-structured data along with some SQL Querying functionality. Web applications or any application which needs better performance and scalability without defining columns in RDBMS. Persistent store with scalability and better query support than key-value store. Lack of sophisticated query capabilities. Several TBs to PBs. MongoDB, CouchDB, SimpleDb
NoSQL- Column Store Very large data store and supports Map-Reduce. Real time data logging in Finance and web analytics. Very high throughput for Big Data, Strong Partitioning Support, random read-write access. Complex query, availability of APIs, response time. Several TBs to PBs HBase, Big Table, Cassandra

Implementing a recommender engine using Hadoop and Mahout

In continuation with my earlier blog, in this blog I will talk about how to implement a recommender engine using Mahout and Hadoop.

  • First a brief introduction about MapReduce and how some of the computational algorithm has to be re-written for taking advantage of parallel processing.
  • Second I will talk about Recommender Algorithm to be deduced to work in Mahout to take advantages of Hadoop.
  • Third, I will walk you thru the use case of how we can implement MapReduce Recommender engine with Hadoop and Mahout.

One of the fundamental principles where parallel computing can be taken advantage of is when each of the computers in a cluster can process same tasks independently without sharing any data among each other. Consider sort algorithm, they are not designed to work in a way were we chunk a large dataset and hand it off to many computer running in parallel and once the data is returned the large dataset is consolidated back. If we need to do sorting, we need to write algorithm to take advantage of MapReduce. Terasort algorithm is one of the sorting techniques which takes advantage of MapReduce.

Similarly in the context of recommender engine, if we need to take advantage of MapReduce we have to write it differently. Mahout supports recommendation algorithm to take advantage of MapReduce. Components of a Recommender engine are

  • User/Item based Recommender
  • Similarities (concept of neighborhood)

“Hello World” equivalent for recommendation problem is Netflix movie rating. The movie sample data is available here . In this test data you have a history of Users who rented Movies and rated these movies. When you rent a new movie, it will give you suggestions based on your renting pattern, as well as what other users with similar profiles are renting, this data is stored in transactional database like MySQL. We can write a batch program to the use the transaction data and move it to Hadoop filesystem. When you run Mahout MapReduce, it will return top ‘n’ recommendations. If you notice the data, it has comma separated userid,itemid,rating. It assumes you are maintaining users and items master tables in transactional database and you will combine these 2 to give the user more informative information.

Let me give you a glimpse of how an Item based recommendation algorithm works using MapReduce, for more on this refer Mahout in Action by Manning, chapter 6.

Recommender system involve

  • The first step using this data is to build an item co-occurrence matrix. It mainly answers a question as to how many times 2 items have co-occurred when the users are buying it. Which is basically as square matrix of the order of n where n is all the items the users have bought
  • The next step is to compute the user vector on what items he is buying. Which is a single column matrix
  • Final step is to produce the recommendation by multiplying the co-occurrence matrix and the user vector. The value to recommend to the user is the item with the highest of the zero value

Let us take a typical use case of airline reservation system. When a user is purchasing flight tickets for a location, he is offered recommendation relevant to his interest based on past purchase history and purchase history of similar users profiles. Assuming the transactional data is present in MySQL, we can use sqoop script to import the data into HDFS as below,

 sqoop import --connect jdbc:mysql://localhost/reservation --username krishna --password pass123  --table useritem

We need to run the mahout script as below in Hadoop

 hadoop jar mahout-core-0.6-job.jar -Dmapred.input.dir=input/input.txt -Dmapred.output.dir=output --usersFile input/users.txt --booleanData --similarityClassname SIMILARITY_COOCCURRENCE

Finally we need to run sqoop script the export the final analytical data back to mysql as below,

 sqoop export --connect jdbc:mysql://localhost/reservation --username krishna --password pass123 --table useritem --export-dir useritem/part-m-00000

Now in your application, you do a data lookup to useritem table and return the Recommender results to the user.

Hadoop and Mahout in Data Mining

Introduction: Data Mining

This blog talks about a typical solution architecture where we use various BigData components like Hadoop, Mahout, Sqoop, Flume and Mongo-Hadoop Connector. Lot of successful companies have been around for atleast 20yrs and have invested in IT infrastructure. They have over a period of time, accumulated data in the form of transaction data, log archive and are eager to analyse these data and see if they can improve the business efficiency out of it.

Below is the Component Diagram of how each of the above frameworks fit into the ecosystem.

Data Mining

In a typical scenario, as the customer are using the IT systems of the company for buying companies products the customer data like , their purchasing patterns, their location, their Gender, how are other similar customers are purchasing are being stored for data mining. Based on this data, the company can helping customers make buying decisions. If you notice carefully Netflix, Amazon, eBay, Gilt are already doing this.

Mahout is one of the open source framework which has a built in Recommender engine. It also runs on Hadoop using Map/Reduce paradigm. In a typical scenario,

  1. User Item data that is stored in MySQL is Data Mining into Hadoop Distributed File System (HDFS) using a ETL tool like Sqoop
  2. User click information that is stored in Log files is exported into HDFS using log migration tools like Flume
  3. If the transaction data is stored in NoSQL, there are connectors to export the data to HDFS

Once the data is in HDFS, we can use Mahout Batch job to run the data analysis and import the processed data back to transactional database.

Hummingbird – real time data analytics

Recently I came across a interesting Company called Gilt which is one of the leading eCommerce company. What is interesting about it is, the whole company runs on real time analytics. Instead of I explaining what is real time analytics, click on the Hummingbird Slideshare presentation, Hummingbird Vimeo link, Hummingbird website  and Hummingbird Demo to understand more. The good news is they donated the code that runs this to Git here. Setting up Himmingbird is described in the Git location. The Hummingbird tracking pixel server by default starts in port 8000 and the Monitor server start in port 8080.

About Hummingbird

Hummingbird is build with Node.js and MongoDb. It works based on tracking pixel concept. Where in the framework will expose a 1×1 tracking pixel. You can embed this pixel in your html pages as below,

<img id="tracking-pixel" src="http://localhost:8000/tracking_pixel.gif?events=scAdd&productId=10&ip=" alt="data analytics" />

And in the querysting of the image, pass whatever value you want to analyse, for example if you want know to how many times a product has been purchased from which location, the data is stored in MongoDb and later it can be analysed over a time range. The over all architecture is as below,

data analytics

Understanding document based MongoDb NoSQL

Introduction: MongoDb NoSQL

This article touches upon various aspects of RDBMS and where RDBMS hits the roadblock in an enterprise world. This article also talks about how we can overcome this road blocks by relaxing few key aspects of RDBMS. It also talks about how a NoSQL based solution fits into this and as a popular MongoDb NoSQL solution.

RDBMS related to enterprise scalabilities, performance and flexibilities

RDBMS evolved out of strong roots in math like Relational and Set theories. Some of the aspects are Schema validation, normalized data to avoid duplication, atomicity, locking, concurrency, high availability and one version of the truth.

While these aspects have lot of benefits in terms of data storage and retrieval, they can impact enterprise scalabilities, performance and flexibilities. Let us consider a typical purchase order example. In RDBMS world we will have 2 tables with one-to-many relationship as below,

Consider that we need to store huge amount of Purchase orders and we started partitioning, one of the ways to partition is to have OrderHeader table in one Db instance and LineItem information in another. And if you want to insert or Update an Order information, you need to update both the tables atomically and you need to have a transaction manager to ensure atomicity. If you want to scale this further in terms of processing and data storage, you can only increase hard disk space and RAM.

The easy way to achieve Scaling in RDBMS is Vertical Scaling

MongoDb NoSQL

Let us consider another situation, because of the change in our business we added a new column to the LineItem table called LineDesc. And imagine that this application was running in production. Once we deploy this change, we need to bring down the server and for some time to take effect this change.

Achieving enterprise scalabilities, performance and flexibility

Fundamental requirements of modern enterprise systems are,

  1. Flexibilities in terms of scaling database so that multiple instance of the database can process the information parallel
  2. Flexibilities in terms of changes to the database can be absorbed without long server downtimes
  3. Application /middle tier does not handle Object-relational impedance mismatch – Can we get away with it using techniques like JSON

Let us go back to our PurchaseOrder example and relax some of the aspects of RDBMS like normalization (avoid joins of lot of rows), atomicity and see if we can achieve some of the above objectives.

Below is an example of how we can store the PurchaseOrder (there are other better way of storing the information).

orderdescription: “Krishna’s Orders”
date:"Sat Jul 24 2010 19:47:11 GMT-0700(PDT)",
{linename:"pendrive", quantity:"5"}, {linename:"harddisk", quantity:"10"}

If you notice carefully, the purchase order is stored in a JSON document like structure. You also notice, we don’t need multiple tables, relationship and normalization and hence there is no need to join. And since the schema qualifiers are within the document, there is no table definition.

You can store them as collection of objects/documents. Hypothetically if we need to store several millions of PurchaseOrders, we can chunk them in groups and store them in several instances.

If you want to retrieve PurchaseOrders based on specific criteria, for example all the purchase orders in which one of the line item is a “pendrive”, we can ask all the individual instances to retrieve in “parallel” based on the same criteria and one of them can consolidate the list and return the information to the client. This is the concept of Horizontal Scaling

MongoDb NoSQL

Because the there is no separate Table schema and and the schema definition is included in the JSON object, we can change document structure and store and retrieve with just change in application layer. This does not need database restart.

Finally the object structure is JSON, we can directly present it to the web tier or mobile device and they will render it.

NoSQL is a classification of Database which is designed to keep the above aspects in mind.

MongoDb: Document based NoSQL

MongoDb NoSQL database is document based which is some of the above techniques to store and retrieve the data. There are few NoSQL databases that are Ordered Key Value based like Redis, Cassandra whichalso take these approaches but are much simpler.

If you have to give RDBMS analogy, Collection in MongoDb are similar to Tables, Document are similar to Rows. Internally MongoDb stores the information as Binary Serializable JSON objects called BSON. MongoDb support JavaScript style query syntax to retrieve BSON objects.

MongoDb NoSQL

Typical documents looks as below,

date:new Date(),
text:“Destination Moon”,
tags:[“comic”,“adventure”] }



>db.posts.find()  {

_id:ObjectId(" 4c4ba5c0672c685e5e8aabf3"),
date:"Sat Jul 24 2010 19:47:11 GMT-0700(PDT)",
text:"Destination Moon",

In MongoDb, atomicity is guaranteed within a document. If you have to achieve atomicity outside of the document, it has to be managed at the application level. Below is an example,

Many to many:


//All products for a given category


//All categories for a given product



In a typical stack that uses MongoDb, it makes lot of sense to use a JavaScript based framework. A good web framework, we use Express/Node.js/MongoDb stack. A good example of how to use these stack is here.

MongoDb NoSQL also supports sharding which supports parallel processing/horizontal scaling. For more details on how a typical BigData handles parallel processing/horizontal scaling, refer Rickly Ho’s link

A typical use cases for MongoDb include, Event logging, Realtime Analytics, Content Management, Ecommerce. Use cases where it is not a good fit are Transaction base Banking system, Non Realtime Data warehousing


Datawarehouse implementation using Hadoop+Hbase+Hive+SpringBatch – Part 2

The svn codebase for this article is here.

In continuation to part 1, this section covers,

  1. Setup of a Hadoop, Hbase, Hive on a new Ubuntu VM
  2. Run Hadoop, Hbase and Hive as services
  3. Setup the Spring batch project and run the tests
  4. Some useful commands/tips

To begin with let me tell you the choice of using Hive was to understand not to use Hive as a JDBC equivalent. It was more to understand how to use Hive as a powerful datawarehouse analytics engine.

Setup of a Hadoop, Hbase, Hive on a new Ubuntu VM

Download the latest Hadoop, Hbase and Hive from the apache websites. You can also go to Cloudera website and get the Cloudera UBuntu VM and use apt-get install hadoop, hbase and hive. It did not work for me, if you are adventurous you can try that. You can also try MapR’s VMs. Both Cloudera and MapR have good documentation and tutorials.

Unzip the file in the home directory and go to .profile file and add the bin directories to the path as below,


sudo mkdir -p /app/hadoop/tmp

sudo chown <login user>:<machine name>/app/hadoop/tmp

hadoop namenode -format

Set HADOOP_HOME, HBASE_HOME and HIVE_HOME environment variables.
Run the ifconfig and get the ip address it will be something like,

Go to etc/hosts file and add an entry like, <machine name>

Run Hadoop, Hbase and Hive as services

Go to hadoop root folder and run the below command,

Open a browser and access http://localhost:50060/ it will open the hadoop admin console. If there are some issues, execute below command and see if there are any exceptions

tail -f $HADOOP_HOME/logs/hadoop-<login username>-namenode-<machine name>.log

Hadoop is running in 54310 port by default.
Go to hbase root folder and run the below command,
tail -f $HBASE_HOME/logs/hbase--master-.log

See if there are any errors. Hbase is runing in port 60000 by default
Go to hive root folder and run the below command,

hive --service hiveserver -hiveconf hbase.master=localhost:60000 mapred.job.tracker=local

Notice, by giving the hbase reference we have integrated hive with hbase. Also hive default port is 10000. Now run hive as a command line client as follow,

hive -h localhost

Create the seed table as below,

  CREATE TABLE weblogs(key int, client_ip string, day string, month string, year string, hour string,
    minute string, second string, user string, loc  string) row format delimited fields terminated by '\t';

  CREATE TABLE hbase_weblogs_1(key int, client_ip string, day string, month string, year string, hour string,
    minute string, second string, user string, loc  string)
    STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
    WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key, cf1:client_ip, cf2:day, cf3:month, cf4:year, cf5:hour, cf6:minute, cf7:second, cf8:user, cf9:loc")
    TBLPROPERTIES ("" = "hbase_weblog");

  LOAD DATA LOCAL INPATH '/home/hduser/batch-wordcount/weblogs_parse1.txt' OVERWRITE INTO TABLE weblogs;

  INSERT OVERWRITE TABLE hbase_weblogs_1 SELECT * FROM weblogs;

Setup the Spring batch project and run the tests
To setup this project get the latest code from SVN mentioned in the beginning. Download gradle and setup the path in the .profile. Now run the below command to load the data,

  gradle test

Run the below junit to get the analysis data,

  gradle test

hadoopVersion is 1.0.2. build.gradle file looks as below,

repositories {     // Public Spring artefacts
  maven { url "" }
  maven { url "" }
  maven { url "" }
  maven { url "" }
  maven { url "" }
  maven { url "" }
  maven { url ""}

dependencies {
  compile ("$version")
  { exclude group: 'org.apache.thrift', module: 'thrift' }
  compile "org.apache.hadoop:hadoop-examples:$hadoopVersion"
  compile "org.springframework.batch:spring-batch-core:$springBatchVersion"
  // update the version that comes with Batch
  compile "org.springframework:spring-tx:$springVersion"
  compile "org.apache.hive:hive-service:0.9.0"
  compile "org.apache.hive:hive-builtins:0.9.0"
  compile "org.apache.thrift:libthrift:0.8.0"
  runtime "org.codehaus.groovy:groovy:$groovyVersion"
  // see HADOOP-7461
  runtime "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion"
  testCompile "junit:junit:$junitVersion"
  testCompile "org.springframework:spring-test:$springVersion"

Spring Data Hadoop configuration looks as below,

<!-- The value after the question mark is the default value if another value for hd.fs is not provided -->${hd.fs:hdfs://localhost:9000}

<hive-client host="localhost" port="10000" />

Spring Batch job looks as below,

<batch:job id="job1">
  <batch:step id="import">
    <batch:tasklet ref="hive-script"/>

Spring Data Hive script for loading the data is as below,

<hive-tasklet id="hive-script">
LOAD DATA LOCAL INPATH '/home/hduser/batch-analysis/weblogs_parse.txt' OVERWRITE INTO TABLE weblogs;
INSERT OVERWRITE TABLE hbase_weblogs_1 SELECT * FROM weblogs;

Spring Data Hive script for analyzing the data is as below,

<hive-tasklet id="hive-script">
<script>    SELECT client_ip, count(user) FROM hbase_weblogs_1 GROUP by client_ip;   </script>

Some useful commands/tips

For querying hadoop dfs you can use any file based unix commands like,

hadoop dfs -ls /
hadoop dfs -mkdir /hbase

If you have entered safemode in hadoop and it is not starting up you can execute below command,

hadoop dfsadmin -safemode leave

If you want find some errors in the file hadoop filesystem you can execute below command,

hadoop fsck /

Datawarehouse implementation using Hadoop+Hbase+Hive+SpringBatch – Part 1

In continuation of my earlier article, I proceeded a little more in understanding of what are these technologies and where are these used and a technical implementation of these. I want present this material into 2 parts,

  1. Introduction to some of the terminologies and explaining the use case we are going to implement
  2. Implementation details of the use case

Hadoop is primarily a distributed file system (DFS) where you can store terabytes/petabytes of data on low end commodity computers. This is similar to how companies like Yahoo and Google store their page feeds.

Hbase is a BigTable implementation which can either use file system or Hadoop DFS to store the data. It is similar to RDBMS like Oracle. There are 2 main differences between,

  1. It is designed to store sparse data, use case is where there are lots of columns and there are no data stored in majority of these columns
  2. It has a powerful versioning capabilities, a typical use case it to track the trajectory of a rocket over a period of time

Hive is built on the Map Reduce concept and is built on top of Hadoop. It supports SQL like query language. Its main purpose is datawarehouse analysis and adhoc querying. Hive can also be built on top of Hbase.

Spring Batch is a Spring framework for managing a batch workflow.


Let us say you have a website which has lot of traffic. We need to build an application which analyses the traffic like number of times the user has come to the website. Which location the website is accessed most. Below is the data flow,

Log file feed structure, 07:45 07:46 07:48 07:49

There will be a lookup table in Hbase as below, kp Boston kk Bangalore ak Bangalore

Now we write 2 applications:

  1. Spring batch will run a command line to pull the log feed and push the ip address feed to hbase
  2. Java commandline will pull the hbase information using hive to display the metrics as follows

presentation useranalysis
presentation locationanalysis

Now that you know the technology pieces and high level solution, in my next article I will write how we can implement this with working code.