What better place than here, what better time than now?
Lately I've been studying how to write MapReduce programs. MapReduce is an algorithmic pattern for writing distributed divide-and-conquer programs. One of the main problems in parallelizing programs is how to split the input data amongst the processing nodes: if the pieces are too big, then the individual nodes won't be able to process the parts of the data that are assigned to them; but if the pieces are too small, then most of the computing and network resources will be spent on communication and coordination, instead of processing the data. This is one of the reasons why the automatic parallelization of programs, as it is performed in systems like e.g. NESL or Data Parallel Haskell, is very difficult.
For me MapReduce is a way to overcome this problem by giving the programmer explicit control of the way the input data is splitted ... or you can also see that as leaving the problem for the programmer. Anyway, in MapReduce this is achieved very elegantly by reducing any problem to the task of defining two functions: a map function and a reduce function:
- The purpose of the map function is putting together the parts of the input that need to be processed together, i.e., the groups that are used for a distributed divide-and-conquer. This way the programmer specifies the granullarity of the split of the data I mentioned above.
- Then the reduce functions specifies how to process each of the groups defined by map. These groups are processed independently in differerent worker nodes.
This model is made concrete by constraining the signature of these functions as follows:
- The function map takes a key and a value corresponding to a piece of the input data, and generates a list of intermediate key-value pairs. The map function is executed in parallel and independently in several worker nodes, and then all the pairs generated in each of the nodes are shuffled by the MapReduce engine so values corresponding to the same intermediate key are put together.
- Now we have a mapping from intermediate keys to list of intermediate values. The reduce function takes an intermediate key and the corresponding list of values and generates a list of output key-value pairs.
So let's finish setting these ideas with the paradigmatic HelloWorld program for MapReduce: the word count problem. In this problem we have several texts and we want to count the number of times each word appears in each of the texts. As I'm a very pythonic person, I will represent an example input with the following Python list of pairs:
[(None, "hola que tal hola"), (None, "ey hola"), (None, "como estamos")]
What about the map function? We want to count words, so we have to put together all the ocurrences of each word. That sounds easy, like the following Python code for map, that defines a Python generator of key-value pairs, where each key is a word that appears in an input text, and the value is always 1, as a declaration that the word exists:
def map_f(_title, text):
for word in text.split(' '):
yield((word, 1))
Now we have all the ocurrences of each together, all we have to do is count the 1s for each word, and we just got our word counter program:
def reduce_f(word, counts):
yield((word, sum(counts)))
Maybe that was a little too much for a very short introduction to MapReduce, but give me a break, this is my first blog post. And here it comes my real motivation for writing this post. While I was reading about MapReduce, it ocurred to me that writing Yet Another MapReduce Emulator would be i) a good way to interiorize the mechanics of MapReduce and the way the map and reduce functions are invoked; and ii) it could be a nice tool for the first stages of the development of MapReduce programs.
As I said I'm a very pythonic person, so I developed the emulator in Python, also because there are nice debuggers like PyDev that could increase the usefulness of the emulator. Besides Python has several Functional Programming tricks in its bag that I could use both for the emulator and for the MapReduce programs.
This post is already too long, so I won't explain the details of the emulator, but it's pretty simple and you can take a look at it at my GitHub repo. Here is the output of an execution of the implementation of word count above in my emulator.
Executing word_count for input: [(None, 'hola que tal hola'), (None, 'ey hola'), (None, 'como estamos')]
------------------------------
mappers_inputs:
#mappers: 3 | contents: [[(None, 'ey hola')], [(None, 'hola que tal hola')], [(None, 'como estamos')]]
combiners_inputs:
[{'ey': [1], 'hola': [1]}, {'tal': [1], 'que': [1], 'hola': [1, 1]}, {'estamos': [1], 'como': [1]}]
combiners_outputs:
[[('ey', 1), ('hola', 1)], [('tal', 1), ('que', 1), ('hola', 2)], [('estamos', 1), ('como', 1)]]
shuffled_pairs:
{'que': [1], 'como': [1], 'tal': [1], 'ey': [1], 'estamos': [1], 'hola': [1, 2]}
------------------------------
[('que', 1), ('como', 1), ('tal', 1), ('ey', 1), ('estamos', 1), ('hola', 3)]
The input list is randomly splitted into several lists in mappers_inputs to simulate the distribution of the input into several mapper nodes. The list of dictionaries combiners_inputs correspond to the local execution of the combiners at the mapper nodes. The combiner is a secret ingredient for scaling MapReduce I did not mention above, but the basic idea is extending this diagram for MapReduce by performing a kind of local reduce in the mapper nodes before sending the output pair of map to the MapReduce engine for shuffle. The combine function is another function with the same signature as reduce but that only works with the pairs local to the worker node where it is invoked. In some situations the use of a combiner dramatically reduces the network usage during the shuffle phase as the number of pairs emited to the network is increased a lot.
In the example above the combiner works between combiners_inputs and combiners_outputs: you can see how in the second mapper the combiner collapses the two occurrences of the word 'hola' into a single pair ('hola', 2). Without the combiner two pairs ('hola', 1) would have been emitted instead. This doesn't sound very impressive, uh? Now imagine this with TBs of documents, and the repetition frequency of words in real texts, it makes more sense now?
I hope you enjoy (at least some part of) this post, see you!