Tag Archives: Hadoop

Implementing a recommender engine using Hadoop and Mahout

In continuation with my earlier blog, in this blog I will talk about how to implement a recommender engine using Mahout and Hadoop.

  • First a brief introduction about MapReduce and how some of the computational algorithm has to be re-written for taking advantage of parallel processing.
  • Second I will talk about Recommender Algorithm to be deduced to work in Mahout to take advantages of Hadoop.
  • Third, I will walk you thru the use case of how we can implement MapReduce Recommender engine with Hadoop and Mahout.

One of the fundamental principles where parallel computing can be taken advantage of is when each of the computers in a cluster can process same tasks independently without sharing any data among each other. Consider sort algorithm, they are not designed to work in a way were we chunk a large dataset and hand it off to many computer running in parallel and once the data is returned the large dataset is consolidated back. If we need to do sorting, we need to write algorithm to take advantage of MapReduce. Terasort algorithm is one of the sorting techniques which takes advantage of MapReduce.

Similarly in the context of recommender engine, if we need to take advantage of MapReduce we have to write it differently. Mahout supports recommendation algorithm to take advantage of MapReduce. Components of a Recommender engine are

  • User/Item based Recommender
  • Similarities (concept of neighborhood)

“Hello World” equivalent for recommendation problem is Netflix movie rating. The movie sample data is available here . In this test data you have a history of Users who rented Movies and rated these movies. When you rent a new movie, it will give you suggestions based on your renting pattern, as well as what other users with similar profiles are renting, this data is stored in transactional database like MySQL. We can write a batch program to the use the transaction data and move it to Hadoop filesystem. When you run Mahout MapReduce, it will return top ‘n’ recommendations. If you notice the data, it has comma separated userid,itemid,rating. It assumes you are maintaining users and items master tables in transactional database and you will combine these 2 to give the user more informative information.

Let me give you a glimpse of how an Item based recommendation algorithm works using MapReduce, for more on this refer Mahout in Action by Manning, chapter 6.

Recommender system involve

  • The first step using this data is to build an item co-occurrence matrix. It mainly answers a question as to how many times 2 items have co-occurred when the users are buying it. Which is basically as square matrix of the order of n where n is all the items the users have bought
  • The next step is to compute the user vector on what items he is buying. Which is a single column matrix
  • Final step is to produce the recommendation by multiplying the co-occurrence matrix and the user vector. The value to recommend to the user is the item with the highest of the zero value

Let us take a typical use case of airline reservation system. When a user is purchasing flight tickets for a location, he is offered recommendation relevant to his interest based on past purchase history and purchase history of similar users profiles. Assuming the transactional data is present in MySQL, we can use sqoop script to import the data into HDFS as below,

 sqoop import --connect jdbc:mysql://localhost/reservation --username krishna --password pass123  --table useritem
 

We need to run the mahout script as below in Hadoop

 hadoop jar mahout-core-0.6-job.jar org.apache.mahout.cf.taste.hadoop.item.RecommenderJob -Dmapred.input.dir=input/input.txt -Dmapred.output.dir=output --usersFile input/users.txt --booleanData --similarityClassname SIMILARITY_COOCCURRENCE
 

Finally we need to run sqoop script the export the final analytical data back to mysql as below,

 sqoop export --connect jdbc:mysql://localhost/reservation --username krishna --password pass123 --table useritem --export-dir useritem/part-m-00000
 

Now in your application, you do a data lookup to useritem table and return the Recommender results to the user.

Advertisements

Hadoop and Mahout in Data Mining

Introduction: Data Mining

This blog talks about a typical solution architecture where we use various BigData components like Hadoop, Mahout, Sqoop, Flume and Mongo-Hadoop Connector. Lot of successful companies have been around for atleast 20yrs and have invested in IT infrastructure. They have over a period of time, accumulated data in the form of transaction data, log archive and are eager to analyse these data and see if they can improve the business efficiency out of it.

Below is the Component Diagram of how each of the above frameworks fit into the ecosystem.

Data Mining

In a typical scenario, as the customer are using the IT systems of the company for buying companies products the customer data like , their purchasing patterns, their location, their Gender, how are other similar customers are purchasing are being stored for data mining. Based on this data, the company can helping customers make buying decisions. If you notice carefully Netflix, Amazon, eBay, Gilt are already doing this.

Mahout is one of the open source framework which has a built in Recommender engine. It also runs on Hadoop using Map/Reduce paradigm. In a typical scenario,

  1. User Item data that is stored in MySQL is Data Mining into Hadoop Distributed File System (HDFS) using a ETL tool like Sqoop
  2. User click information that is stored in Log files is exported into HDFS using log migration tools like Flume
  3. If the transaction data is stored in NoSQL, there are connectors to export the data to HDFS

Once the data is in HDFS, we can use Mahout Batch job to run the data analysis and import the processed data back to transactional database.

Datawarehouse implementation using Hadoop+Hbase+Hive+SpringBatch – Part 2

The svn codebase for this article is here.

In continuation to part 1, this section covers,

  1. Setup of a Hadoop, Hbase, Hive on a new Ubuntu VM
  2. Run Hadoop, Hbase and Hive as services
  3. Setup the Spring batch project and run the tests
  4. Some useful commands/tips

To begin with let me tell you the choice of using Hive was to understand not to use Hive as a JDBC equivalent. It was more to understand how to use Hive as a powerful datawarehouse analytics engine.

Setup of a Hadoop, Hbase, Hive on a new Ubuntu VM

Download the latest Hadoop, Hbase and Hive from the apache websites. You can also go to Cloudera website and get the Cloudera UBuntu VM and use apt-get install hadoop, hbase and hive. It did not work for me, if you are adventurous you can try that. You can also try MapR’s VMs. Both Cloudera and MapR have good documentation and tutorials.

Unzip the file in the home directory and go to .profile file and add the bin directories to the path as below,

export HADOOP_HOME=<HADOOP HOME>
export HBASE_HOME=<HBASE HOME>
export HIVE_HOME=<HIVE HOME>
export PATH=$PATH:$HADOOP_HOME/bin:$HBASE_HOME/bin:$HIVE_HOME/bin

sudo mkdir -p /app/hadoop/tmp

sudo chown <login user>:<machine name>/app/hadoop/tmp

hadoop namenode -format

Set HADOOP_HOME, HBASE_HOME and HIVE_HOME environment variables.
Run the ifconfig and get the ip address it will be something like, 192.168.45.129

Go to etc/hosts file and add an entry like,

192.168.45.129 <machine name>

Run Hadoop, Hbase and Hive as services

Go to hadoop root folder and run the below command,

start-all.sh

Open a browser and access http://localhost:50060/ it will open the hadoop admin console. If there are some issues, execute below command and see if there are any exceptions

tail -f $HADOOP_HOME/logs/hadoop-<login username>-namenode-<machine name>.log

Hadoop is running in 54310 port by default.
Go to hbase root folder and run the below command,

start-hbase.sh
tail -f $HBASE_HOME/logs/hbase--master-.log

See if there are any errors. Hbase is runing in port 60000 by default
Go to hive root folder and run the below command,

hive --service hiveserver -hiveconf hbase.master=localhost:60000 mapred.job.tracker=local

Notice, by giving the hbase reference we have integrated hive with hbase. Also hive default port is 10000. Now run hive as a command line client as follow,

hive -h localhost

Create the seed table as below,

  CREATE TABLE weblogs(key int, client_ip string, day string, month string, year string, hour string,
    minute string, second string, user string, loc  string) row format delimited fields terminated by '\t';

  CREATE TABLE hbase_weblogs_1(key int, client_ip string, day string, month string, year string, hour string,
    minute string, second string, user string, loc  string)
    STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
    WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key, cf1:client_ip, cf2:day, cf3:month, cf4:year, cf5:hour, cf6:minute, cf7:second, cf8:user, cf9:loc")
    TBLPROPERTIES ("hbase.table.name" = "hbase_weblog");

  LOAD DATA LOCAL INPATH '/home/hduser/batch-wordcount/weblogs_parse1.txt' OVERWRITE INTO TABLE weblogs;

  INSERT OVERWRITE TABLE hbase_weblogs_1 SELECT * FROM weblogs;

Setup the Spring batch project and run the tests
To setup this project get the latest code from SVN mentioned in the beginning. Download gradle and setup the path in the .profile. Now run the below command to load the data,

  gradle -Dtest=org.springframework.data.hadoop.samples.DataloadWorkflowTests test

Run the below junit to get the analysis data,

  gradle -Dtest=org.springframework.data.hadoop.samples.AnalyzeWorkflowTests test

hadoopVersion is 1.0.2. build.gradle file looks as below,

repositories {     // Public Spring artefacts
  mavenCentral()
  maven { url "http://repo.springsource.org/libs-release" }
  maven { url "http://repo.springsource.org/libs-milestone" }
  maven { url "http://repo.springsource.org/libs-snapshot" }
  maven { url "http://www.datanucleus.org/downloads/maven2/" }
  maven { url "http://oss.sonatype.org/content/repositories/snapshots" }
  maven { url "http://people.apache.org/~rawson/repo" }
  maven { url "https://repository.cloudera.com/artifactory/cloudera-repos/"}
}

dependencies {
  compile ("org.springframework.data:spring-data-hadoop:$version")
  { exclude group: 'org.apache.thrift', module: 'thrift' }
  compile "org.apache.hadoop:hadoop-examples:$hadoopVersion"
  compile "org.springframework.batch:spring-batch-core:$springBatchVersion"
  // update the version that comes with Batch
  compile "org.springframework:spring-tx:$springVersion"
  compile "org.apache.hive:hive-service:0.9.0"
  compile "org.apache.hive:hive-builtins:0.9.0"
  compile "org.apache.thrift:libthrift:0.8.0"
  runtime "org.codehaus.groovy:groovy:$groovyVersion"
  // see HADOOP-7461
  runtime "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion"
  testCompile "junit:junit:$junitVersion"
  testCompile "org.springframework:spring-test:$springVersion"
}

Spring Data Hadoop configuration looks as below,

<configuration>
<!-- The value after the question mark is the default value if another value for hd.fs is not provided -->
  fs.default.name=${hd.fs:hdfs://localhost:9000}
  mapred.job.tracker=local</pre>
</configuration>

<hive-client host="localhost" port="10000" />

Spring Batch job looks as below,

<batch:job id="job1">
  <batch:step id="import">
    <batch:tasklet ref="hive-script"/>
  </batch:step>
</batch:job>

Spring Data Hive script for loading the data is as below,

<hive-tasklet id="hive-script">
  <script>
LOAD DATA LOCAL INPATH '/home/hduser/batch-analysis/weblogs_parse.txt' OVERWRITE INTO TABLE weblogs;
INSERT OVERWRITE TABLE hbase_weblogs_1 SELECT * FROM weblogs;
</script>
</hive-tasklet>

Spring Data Hive script for analyzing the data is as below,

<hive-tasklet id="hive-script">
<script>    SELECT client_ip, count(user) FROM hbase_weblogs_1 GROUP by client_ip;   </script>
</hive-tasklet>

Some useful commands/tips

For querying hadoop dfs you can use any file based unix commands like,

hadoop dfs -ls /
hadoop dfs -mkdir /hbase

If you have entered safemode in hadoop and it is not starting up you can execute below command,

hadoop dfsadmin -safemode leave

If you want find some errors in the file hadoop filesystem you can execute below command,

hadoop fsck /

Datawarehouse implementation using Hadoop+Hbase+Hive+SpringBatch – Part 1

In continuation of my earlier article, I proceeded a little more in understanding of what are these technologies and where are these used and a technical implementation of these. I want present this material into 2 parts,

  1. Introduction to some of the terminologies and explaining the use case we are going to implement
  2. Implementation details of the use case

Hadoop is primarily a distributed file system (DFS) where you can store terabytes/petabytes of data on low end commodity computers. This is similar to how companies like Yahoo and Google store their page feeds.

Hbase is a BigTable implementation which can either use file system or Hadoop DFS to store the data. It is similar to RDBMS like Oracle. There are 2 main differences between,

  1. It is designed to store sparse data, use case is where there are lots of columns and there are no data stored in majority of these columns
  2. It has a powerful versioning capabilities, a typical use case it to track the trajectory of a rocket over a period of time

Hive is built on the Map Reduce concept and is built on top of Hadoop. It supports SQL like query language. Its main purpose is datawarehouse analysis and adhoc querying. Hive can also be built on top of Hbase.

Spring Batch is a Spring framework for managing a batch workflow.

Usecase:

Let us say you have a website which has lot of traffic. We need to build an application which analyses the traffic like number of times the user has come to the website. Which location the website is accessed most. Below is the data flow,

Log file feed structure,

192.168.45.129 07:45
192.168.45.126 07:46
192.168.45.127 07:48
192.168.45.129 07:49

There will be a lookup table in Hbase as below,

192.168.45.129 kp Boston
192.168.45.126 kk Bangalore
192.168.45.127 ak Bangalore

Now we write 2 applications:

  1. Spring batch will run a command line to pull the log feed and push the ip address feed to hbase
  2. Java commandline will pull the hbase information using hive to display the metrics as follows

presentation useranalysis
presentation locationanalysis

Now that you know the technology pieces and high level solution, in my next article I will write how we can implement this with working code.

SpringData-Hadoop: Jumpstart Hadoop with Spring

These days there are lot of hype around jargons like Hadoop, HBase, Hive, Pig and BigData. I was itching to learn what are these terms and how I can see them in the real world. I had 2 goals setup up for me,

  1. Create Hadoop Single Node instance
  2. Of course figure out how it is integrated with Spring/Spring Batch

As usual, I googled how to quickly set up and learn these tools. The journey was not smooth. For a Windows user there there are 2 ways you can setup Hadoop Single node cluster on your machine.

  1. Cygwin: The first approach is not easy to setup, I took few days to struggle thru this without much results on my Windows 7
  2. Open source and Commercial VM: EMC-GreenPlum (commercial), Cloudera / Yahoo (opensource) have created VMware instances with Hadoop, Hive, bundled into the VM and and they claim it works out of the box. Yahoo VM partially worked in my machine but it is outdated, it does not integrate with Spring. Cloudera VM did not work in my machine because of some 64bit conflicts.
  3. I got another VM instance from Cloudera for 32bit and it worked. This is a Ubuntu VM instance with all the above tools installed and preconfigured.

I started with Option 3, you can start the VM and do some quick tests as described in the tutorial. If you are in a real hurry, you can open the terminal and run this commands,

cd /usr/lib/hadoop
hadoop jar hadoop-examples.jar pi 10 1000000

Good luck, you ran your 1st Hadoop job.

Now in the same VM download Gradle and SpringData-Hadoop Installation. Unzip both of these in your Cloudera home directory. Go to your .profile file and Add the below line in the end,

export PATH=$PATH:/user/cloudera/gradle-1.0-rc-3/bin

Note your Gradle version maybe different and you should change it accordingly.

Now go to <SpringData-Hadoop Home>/samples/batch-wordcount and open build.gradle file and remove the repositories entries and add the following lines,

repositories {
// Public Spring artefacts
mavenCentral()
maven { url "http://repo.springsource.org/libs-release" }
maven { url "http://repo.springsource.org/libs-milestone" }
maven { url "http://repo.springsource.org/libs-snapshot" }
maven { url "http://www.datanucleus.org/downloads/maven2/" }
maven { url "http://oss.sonatype.org/content/repositories/snapshots" }
maven { url "http://people.apache.org/~rawson/repo" }
maven { url "https://repository.cloudera.com/artifactory/cloudera-repos/" }
}

Open <SpringData-Hadoop Home>/samples/batch-wordcount/gradle.properties and modify

hadoopVersion = 0.20.2-cdh3u3

Open <SpringData-Hadoop Home>/samples/batch-wordcount/src/main/resources/hadoop.properties and edit the below lines

hd.fs=hdfs://localhost:8020
mapred.job.tracker=localhost:8021

Now go to command prompt and run gradle test, the test will be successful. Here is the documentation/tutorial on Spring Hadoop integration

If you want to learn more about Hadoop, there are good tutorials from Cloudera and YDN, please go thru it.