ScalaTest a MapReduce using Akka

For people in hurry here is the MapReduce with ScalaTest and Akka code and steps

I was trying to learn Scala and I wanted to kill several birds in one shot. Let me tell you, I am not disappointed, I feel comfortable working with Scala. If you are coming from Java world, Scala is comparatively more complex, but once you get past initial hurdle you will like it. I wanted to learn

One usecase I wanted to tryout was a simple Word Count MapReduce, this is a hello world of MapReduce. MapReduce is a function programming technique popularly associated with Hadoop Parallel computing. There is a good MapReduce example using Java and Akka. There is a decent MapReduce example, and an other one here and an another one. Below diagram from the source describe the flow,

Word Count MapReduce with Akka and Java by Munish Gupta

Word Count MapReduce with Akka and Java by Munish Gupta

In this example, I am taking advantage of Akka‘s Actor supports for breaking chunk of tasks and processing in parallel and aggregate the final results.

For a starter, SBT is a build tool similar to Maven, extensively used for Scala development. Refer project/plugins.sbt, this has integration with Eclipse IDE. Once you get the code from github, run the below command, you notice 2 files .project and .classpath got generated.


sbt eclipse

Now import the project in Eclipse as Import => “Existing project into workspace”. Once the project is imported into Eclipse, we can take advantage of IntelliSense and other IDE features and develop the application in a easy way compared to writing the scala code in TextPad.

As always, I will start writing a test, the lowest level test is the aggregation test, which takes a map of words and the number of times it has occurred and aggregates it. I used WordSpec for this ScalaTest as below,

"Aggregrate actor" must {
"send back Map message" in {
// create the aggregate Actor
val aggregateActor = system.actorOf(Props[AggregateActor]);
var map: Map[String, Int] = Map[String, Int]("ak" -> 1, "kp" -> 2)
aggregateActor ! map
var map1: Map[String, Int] = Map[String, Int]("ak" -> 1, "kp" -> 2)
aggregateActor ! map1
Thread.sleep(1000)
var output = Map("kp" -> 4, "ak" -> 2)
aggregateActor ! "DISPLAY_LIST"
expectMsg(output)
}
}

Now I write a Reduce unit test, which takes a Result object and create a Map object and publish it to Aggregator object for future aggregation.

"Reduce actor" must {
"send back Map message" in {
// create the aggregate Actor
val aggregateActor = system.actorOf(Props[AggregateActor]);
// create the list of reduce Actors
val reduceRouter = system.actorOf(Props(new ReduceActor(aggregateActor)))
val list: List[Result] = List[Result](new Result("kp", 1), new Result("ak", 2))
reduceRouter ! list
val list1: List[Result] = List[Result](new Result("ak", 1), new Result("kp", 2))
reduceRouter ! list1
Thread.sleep(1000)
var output = Map("kp" -> 3, "ak" -> 3)
aggregateActor ! "DISPLAY_LIST"
expectMsg(output)
}
}

Write a Map unit test to take a line and create a Result object. If you notice carefully, the map and reduce object implements Akka’s Roundrobin Routers where the line is processed by multiple threads in a roundrobbin way.

"Map actor" must {
"send back Map message" in {
// create the aggregate Actor
val aggregateActor = system.actorOf(Props[AggregateActor]);
// create the list of reduce Actors
val reduceRouter = system.actorOf(Props(new ReduceActor(aggregateActor)).withRouter(RoundRobinRouter(nrOfInstances = 2)))
// create the list of map Actors
val mapRouter = system.actorOf(Props(new MapActor(reduceRouter)).withRouter(RoundRobinRouter(nrOfInstances = 2)))
var line = "Aditya Krishna Kartik Manjula"
mapRouter ! line
Thread.sleep(1000)
var output = Map("Kartik" -> 1, "Krishna" -> 1, "Aditya" -> 1, "Manjula" -> 1)
aggregateActor ! "DISPLAY_LIST"
expectMsg(output)
}
}

We write a listController which tests end to end integrating map/reduce/aggregator and assert the values,

"List Reader Controller actor" must {
"send back Map message" in {
// create the aggregate Actor
 val aggregateActor = system.actorOf(Props[AggregateActor]);
// create the list of reduce Actors
 val reduceRouter = system.actorOf(Props(new ReduceActor(aggregateActor)).withRouter(RoundRobinRouter(nrOfInstances = 2)))
// create the list of map Actors
 val mapRouter = system.actorOf(Props(new MapActor(reduceRouter)).withRouter(RoundRobinRouter(nrOfInstances = 2)))
val controller = system.actorOf(Props(new ControllerActor(aggregateActor, mapRouter)))
val lineReadActor = system.actorOf(Props[LineReadActor])
var list = List[String]("Aditya Krishna Kartik Manjula", "Manjula Anand Aditya Kartik", "Anand Vani Phani Aditya", "Kartik Krishna Manjula Aditya", "Vani Phani Anand Manjula")
lineReadActor.tell(list, controller)
Thread.sleep(1000)
var output = Map("Anand" -> 3, "Kartik" -> 3, "EOF" -> 1, "Krishna" -> 2, "Vani" -> 2, "Phani" -> 2, "Aditya" -> 4, "Manjula" -> 4)
 aggregateActor ! "DISPLAY_LIST"
 expectMsg(output)
 }
 }

Finally we will write a fileReaderActor and pass a large file to it to do MapReduce.

Now if you see the actual code, refer MapActor.scala, there is a keyword called yield. yield helps you to create another datatype from one collection. The syntax is as below,

def evaluateExpression(line: String): List[Result] = {
var result = for (word <- line split (" ") toList; if !STOP_WORDS.contains(word.toLowerCase))
yield (new Result(word, 1))
return result
}

Refer ReduceActor.scala you will find the code as “result =>” called as Lambda expression. Lambda expressions are fundamental to functional programming language like Scala,

def reduce(list: List[Result]): Map[String, Int] = {
var results: Map[String, Int] = new HashMap[String, Int]

list.foreach(result => {
if (results.contains(result.word)) {
results(result.word) += result.noOfInstances
} else {
results(result.word) = result.noOfInstances
}
})
return results;
}

Conclusion

If you notice, in this example, I have used and learnt,

  • Scala programming aspects like, Collection, keyword yield, lambda expressions
  • Akka‘s Roundrobin Routers for threading and concurrency
  • SBT for integration with Eclipse
  • ScalaTest for TDD

I hope this blog helped.

About these ads

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s