Map Reduce is a programming model for writing algorithms that process large quantities of data in a (relatively) short time. The building blocks for the programs are very simple map and reduce functions. Writing programs that do more and more complex tasks to data based on those simple functions becomes harder and harder and thus requires more thorough testing in early stages. This blog attempts to outline a simple method for testing the algorithm of a Map-Reduce program based on scoobi.
Let’s look at the simple example on the scoobi website:
[scala highlight=”11,12,13,14″]
import com.nicta.scoobi.Scoobi._
import com.nicta.scoobi.DList._
import com.nicta.scoobi.io.text.TextInput._
import com.nicta.scoobi.io.text.TextOutput._
object WordCount {
def main(allArgs: Array[String]) = withHadoopArgs(allArgs) { args =>
val lines: DList[String] = fromTextFile(args(0))
val counts: DList[(String, Int)] = lines.flatMap(_.split(" "))
.map(word => (word, 1))
.groupByKey
.combine(_+_)
persist(toTextFile(counts, args(1)))
}
}
[/scala]
The example program counts the words in the input file. The beauty of the Scoobi framework is that it allows developers to work with collection abstractions to express their Map-Reduce algorithm, without having to deal with the gory details of writing mappers and reducers directly. The details of the Hadoop framework that underlies scoobi are nicely hidden behind the simple looking statements on lines 9 and 16. Without going into too much detail, the workings of scoobi are summarized in this quote on the scoobi website:
…calling DList methods will not immediately result in data being generated in HDFS. This is because, behind the scenes, Scoobi implements a staging compiler. The purpose of DList methods are to construct a graph of data transformations. Then, the act of persisting a DList triggers the compilation of the graph into one or more MapReduce jobs and their execution.
Now my suggestion is to take this scoobi approach one step further and not only develop the algorithm with a collection abstraction, but test it with simple collections as well!
From a testing point of view, there are two assumptions that I would like to ensure by testing the code above:
- Running the code should produce and run correct Hadoop Mappers and Reducers.
- The algorithm should do the task it is designed for: counting the words in the input file.
The first is a integration type test that can be done by running the jobs on Hadoop in stand-alone mode. To do this, the program needs to be run to produce the Hadoop code and the produced jobs need to be run on Hadoop.
The second can be achieved by running the algorithm (the four lines) against any collection-like object that implements the required functions. Compared to the above described test, this is more of a unit-type test. It should be much easier to write and execute and prove that a small -but functionally rather important- part of the entire program runs correctly.
Let’s be clear: A good unit test can never replace an integration test. Especially in a situation like this, where the unit test runs on an entirely different platform, ensuring that the end product works as expected is very important. Testing the algorithm on simple collections however, enables you to develop complex hadoop jobs in the true spirit of TDD, ensuring that parts of the system work correctly before deploying and running it.
Let’s look at the example. The algorithm for counting words is neatly described in the four highlighted lines above. The algorithm basically consists of four simple collection manipulations, that are implemented in the scoobi type DList, but most of them are well-known higher-order functions that are also implemented by Scala’s collection types.
To be able to unit test it, we have to extract the algorithm to a separate function, that operates on an abstraction that implements the same operations. This abstraction can be implemented on top of one of the Scala collection types for unittesting and on top of Scoobi’s DList type for the production situation.
[scala highlight=”11,12,13,14,15″]
object WordCount {
def main(allArgs: Array[String]) = withHadoopArgs(allArgs) { args =>
val lines: DList[String] = fromTextFile(args(0))
val counts: DList[(String, Int)] = algorithm(lines)
persist(toTextFile(counts, args(1)))
}
def algorithm(lines : ListWrapper[String]) : ListWrapper[(String, Int)] =
lines.flatMap(_.split(" "))
.map(word => (word, 1))
.groupByKey
.combine(_+_)
}
[/scala]
Now we can write unittests like this:
[scala]
class WordCountSuite extends FunSuite {
test("should count single line with same word") {
val result: List[(String, Int)] = WordCount.algorithme("word word word" :: Nil)
assert(result === ("word", 3) :: Nil)
}
test("should count multiple times same line") {
val result: List[(String, Int)] = WordCount.algorithme("a word is a word" ::
"a word is a word" :: Nil)
assert(result.toMap === Map("a" -> 4, "word" -> 4, "is" -> 2))
}
test("should count some lines") {
val result: List[(String, Int)] = WordCount.algorithme("this is a line" ::
"and another line" :: Nil)
assert(result.toMap === Map("this" -> 1, "is" -> 1, "a" -> 1, "line" -> 2,
"and" -> 1, "another" -> 1))
}
}
[/scala]
Behind the scenes a different implementation for the ListWrapper type is used in the test and in the actual program. This is done through implicit conversions. A crude implementation of the necessary classes can be found here.
Writing unit tests for this logic now becomes easy. It is really important however to understand what is being tested: The tests run against a simple in-memory collection. The assurance that the production code will do the same on an Hadoop cluster comes from the assumption that the higher-order functions have the same results when run against the distributed collection as their in-memory counterparts. This however, is the core target of the Scoobi framework: to enable developers to write their hadoop jobs as simple collection operations.