If you compare Example 23-1 with Example 23-4, you'll find that the simple task of counting words has been buried under lots of details about managing parallelism. If we can somehow separate the details of the original problem from the details of parallelization, we may be able to produce a general parallelization library or system that can be applied not just to this word-counting problem, but other large-scale processing problems. The parallelization pattern that we are using is:
For each input record, extract a set of key/value pairs that we care about from each record.
For each extracted key/value pair, combine it with other values that share the same key (perhaps filtering, aggregating, or transforming values in the process).
Let's rewrite our program to implement the application-specific logic of counting word frequencies for each document and summing these counts across documents in two functions that we'll call Map and Reduce. The result is Example 23-5.
void Map(string document) {
for each word w in document {
EmitIntermediate(w, "1");
}
}
void Reduce(string word, list<string> values) {
int count = 0;
for each v in values {
count += StringToInt(v);
}
Emit(word, IntToString(count));
}
|
A simple driver program that uses these routines to accomplish the desired task on a single machine would look like Example 23-6.
map<string, list<string> > intermediate_data;
void EmitIntermediate(string key, string value) {
intermediate_data[key].append(value);
}
void Emit(string key, string value) {
... write key/value to final data file ...
}
void Driver(MapFunction mapper, ReduceFunction reducer) {
for each input item do {
mapper(item)
}
for each key k in intermediate_data {
reducer(k, intermediate_data[k]);
}
}
main() {
Driver(Map, Reduce);
}
|
The Map function is called once for each input record. Any intermediate key/value pairs emitted by the Map function are collected together by the driver code. Then, the Reduce function is called for each unique intermediate key, together with a list of intermediate values associated with that key.
We're now back to an implementation that runs on a single machine. However, with things separated in this manner, we can now change the implementation of the driver program to make it deal with distribution, automatic parallelization, and fault tolerance without affecting the application-specific logic in the Map and Reduce functions. Furthermore, the driver is independent of the particular application logic implemented by the Map and Reduce functions, and therefore the same driver program can be reused with other Map and Reduce functions to solve different problems. Finally, notice that the Map and Reduce functions that implement the application-specific logic are nearly as understandable as the simple sequential code shown in Example 23-1.