Jeffrey Dean and Sanjay Ghemawat
This chapter describes the design and implementation of mapreduce, a programming system for large-scale data processing problems. MapReduce was developed as a way of simplifying the development of large-scale computations at Google. MapReduce programs are automatically parallelized and executed on a large cluster of commodity machines. The runtime system takes care of the details of partitioning the input data, scheduling the program's execution across a set of machines, handling machine failures, and managing the required intermachine communication. This allows programmers without any experience with parallel and distributed systems to easily utilize the resources of a large distributed system.
Suppose that you have 20 billion documents, and you want to generate a count of how often each unique word occurs in the documents. With an average document size of 20 KB, just reading through the 400 terabytes of data on one machine will take roughly four months. Assuming we were willing to wait that long and that we had a machine with sufficient memory, the code would be relatively simple. Example 23-1 (all the examples in this chapter are pseudocode) shows a possible algorithm.
map<string, int> word_count;
for each document d {
for each word w in d {
word_count[w]++;
}
}
... save word_count to persistent storage ...
|
One way of speeding up this computation is to perform the same computation in parallel across each individual document, as shown in Example 23-2.
Mutex lock; // Protects word_count
map<string, int> word_count;
for each document d in parallel {
for each word w in d {
lock.Lock();
word_count[w]++;
lock.Unlock();
}
}
... save word_count to persistent storage ...
|
The preceding code nicely parallelizes the input side of the problem. In reality, the code to start up threads would be a bit more complex, since we've hidden a bunch of details by using pseudocode. One problem with Example 23-2 is that it uses a single global data structure for keeping track of the generated counts. As a result, there is likely to be significant lock contention with the word_count data structure as the bottleneck. This problem can be fixed by partitioning the word_count data structure into a number of buckets with a separate lock per bucket, as shown in Example 23-3.
struct CountTable {
Mutex lock;
map<string, int> word_count;
};
const int kNumBuckets = 256;
CountTable tables[kNumBuckets];
for each document d in parallel {
for each word w in d {
int bucket = hash(w) % kNumBuckets;
tables[bucket].lock.Lock();
tables[bucket].word_count[w]++;
tables[bucket].lock.Unlock();
}
}
for (int b = 0; b < kNumBuckets; b++) {
... save tables[b].word_count to persistent storage ...
}
|
The program is still quite simple. However, it cannot scale beyond the number of processors in a single machine. Most affordable machines have eight or fewer processors, so even with perfect scaling, this approach will still require multiple weeks of processing to complete. Furthermore, we have been glossing over the problem of where the input data is stored and how fast it can be read by one machine.
Further scaling requires that we distribute the data and the computation across multiple machines. For the moment, let's assume that the machines do not fail. One way to increase scaling is to start many processes on a cluster of networked machines. We will have many input processes, each one responsible for reading and processing a subset of the documents. We will also have many output processes, each responsible for managing one of the word_count buckets. Example 23-4 shows the algorithm.
|
Code View: Scroll / Show All const int M = 1000; // Number of input processes
const int R = 256; // Number of output processes
main() {
// Compute the number of documents to assign to each process
const int D = number of documents / M;
for (int i = 0; i < M; i++) {
fork InputProcess(i * D, (i + 1) * D);
}
for (int i = 0; i < R; i++) {
fork OutputProcess(i);
}
... wait for all processes to finish ...
}
void InputProcess(int start_doc, int end_doc) {
map<string, int> word_count[R]; // Separate table per output process
for each doc d in range [start_doc .. end_doc-1] do {
for each word w in d {
int b = hash(w) % R;
word_count[b][w]++;
}
}
for (int b = 0; b < R; b++) {
string s = EncodeTable(word_count[b]);
... send s to output process b ...
}
}
void OutputProcess(int bucket) {
map<string, int> word_count;
for each input process p {
string s = ... read message from p ...
map<string, int> partial = DecodeTable(s);
for each <word, count> in partial do {
word_count[word] += count;
}
}
... save word_count to persistent storage ...
}
|
This approach scales nicely on a network of workstations, but is significantly more complicated and hard to understand (even though we've hidden the details of marshaling and unmarshaling, as well as starting and synchronizing different processes). It also does not deal gracefully with machine failures. To deal with failures, we would extend Example 23-4 to re-execute processes that failed before completion. To avoid double-counting data when we re-execute an input process, we would mark each piece of intermediate data with a generation number of the input process and modify the output processing so that it uses these generation numbers to discard duplicates. As you can imagine, adding this failure-handling support would further complicate things.