You have heard or read about MapReduce and its power to process big data in the cloud. But you have no idea how it actually works or how to use it. MapReduce is a complex technology because it performs its work in parallel and uses distributed computing as its foundation. MapReduce is particularly more complex to setup than to use, which can be intimidating to first time users. You can simulate MapReduce's parallel & distributed on your own laptop to make MapReduce setup simple and also make it easier to understand MapReduce's core concepts.
MapReduce offers an approach to distribute big data processing tasks across multiple servers so tasks as a whole are done faster. This is the essence of MapReduce, even though navigating its ABC's and setting it up can be intimidating for first time users. MapReduce is a framework, so it's closely tied to programming tasks to achieve its work. In fact, this is how MapReduce gets its name: by applying map and reduce functions to data.
Before going in head first to implement map and reduce functions on your big data, it's convenient to first answer the question, when is it even convenient to consider MapReduce for your big data and cloud projects ? After all, why even go to the trouble of using MapReduce, if you already have data in a database or if you have a data file ready to put into a database for application access.
If your current data is a means to an end (i.e. you're more interested in data produced by processing or grouping your current big data) then MapReduce represents an ideal solution to speed up processing times and reduce the size of your data. Although data in a database or a CSV file can be processed or grouped into smaller subsets to satisfy an application's requirements, processing or grouping data in these storage technologies can be time consuming, particularly if the data universe you're working with is into the Terabytes or millions of records. MapReduce is designed to work in parallel, which means it works faster for an equal processing or grouping task done on a regular database or CSV file.
This is not to say MapReduce is a 'magic bullet' to speed up processing on data applications. One of the biggest drawbacks of MapReduce is that it's not a storage system, but rather a batch system. This means that unlike a database or CSV file in which data can be accessed and processed directly from an application, MapReduce requires that you process data as a separate step and then place its results on a storage system (e.g. database or CSV file) so it can be accessed by an application.
There is in fact a fine line trying to strike a balance when it's convenient to use MapReduce or not. If in order to gain insight from a large data set the processing times for data stored on a database or CSV takes minutes or even hours, incorporating MapReduce into an application's workflow can provide a significant performance boost, even when MapReduce implies using additional software and an extra ETL(Extract-Transform-Load) step from storage system to MapReduce and back to a storage system. Table 1 contains a series of scenarios under which to consider MapReduce, including the type of source data, the functionality required by the map and reduce functions, as well as the end result.
Source data (Into millions of records or TB in size) | Map function functionality | Reduce function functionality | End result |
---|---|---|---|
Company departmental sales history | Map departments to sales | Reduce sales to grand total by department | Get total sales for each department |
Advertising banners to clicks | Map banners to clicks | Reduce clicks to total by banner | Get total clicks for each banner |
Earthquake locations | Map locations to earthquakes | Reduce earthquake locations to total count | Get total earthquakes for each location |
Historic Olympic medal winners | Map countries to medals | Reduce medals to total count by country | Get total medals for each country |
*Web page links | Map web pages to links | Reduce links to total by web page | Get total links per web page |
* This scenario is similar to how Google initially calculated the relevance for its search results. |
Although a large ecosystem of tools and products have grown around MapReduce, often hiding some its core building blocks (e.g. map and reduce functions) in favor of more friendlier approaches -- many of which I have to say often lead to the perceived complexity of MapReduce -- at its core MapReduce is just this: a batch process where two custom functions are applied to big data sets.
So no matter how 'turn-key' a MapReduce solution offered by a cloud application appears to be (e.g. 'a one click solution' or 'a few dollars'), it's vital that you first firmly grasp MapReduce's core functionality, which is why the next section walks you through how to use MapReduce on your laptop in less than 10 minutes.
Since MapReduce is a framework, it's offered in a variety of programming languages and by many different vendors. To get you started with MapReduce I'll use the open source Python implementation called mincemeat. Among MapReduce implementations, mincemeat is one of the simplest to install. And considering some MapReduce implementations can take hours to install, mincemeat is a good choice if you're starting out with MapReduce.
You can get mincemeat at https://pypi.python.org/pypi/mincemeat or this adapted mincemeat version I created for Python 3. Once you download mincemeat's source code from either location, unzip the content's of the file and proceed to install the source as any Python package (e.g. execute python setup.py install
inside the main directory). Once you setup mincemeat on your laptop, you can proceed to design a MapReduce task.
Besides being easy to install and lightweight, mincemeat also doesn't require you to set up multiple servers right after installation like most MapReduce implementations -- although mincemeat does support distributed servers. In addition, mincemeat also keeps configuration and dependencies to a minimum, with no need to setup other supporting software like daemons or file systems which can be off putting to newcomers.
Listing 1 illustrates a MapReduce task built around mincemeat that counts locations (i.e. city.state and country) present in a CSV file -- you can get the source CSV file (~15MB) at AviationData.csv.
#!/usr/bin/env python import mincemeat import csv import operator # Define map function def map_bigdata(k, v): for field in v: if field == '': continue yield field.upper(),1 # Define reduce function def reduce_bigdata(k, ones): return len(ones) # Read source data from CSV file, use csv module # NOTE: You can get the AviationData.csv file (~15MB) at https://www.cloudstorageoptions.com/static/data/AviationData.csv allrows = csv.reader(open('AviationData.csv', 'rb'), delimiter=',') # Define dictionary placeholder for source data source_data = {} # Loop over source data for index,row in enumerate(allrows): # Generate dictionary, with row key index and value from city.state and country column source_data[index] = [row[4],row[5]] # Initiate mincemeat master server s = mincemeat.Server() # Assign datasource to mincemeat (can be any dictionary-like object) s.datasource = source_data # Assign map function to mincemeat s.mapfn = map_bigdata # Assign reduce function to mincemeat s.reducefn = reduce_bigdata # Run master server results = s.run_server(password="bigdata") # Results are unordered dictionary, so Sort items in dictionary for highest counts sorted_results = sorted(results.iteritems(), key=operator.itemgetter(1)) # Rever dictionary so highest counts are first elements sorted_results.reverse() # Loop over first 10 highest counts for freq in sorted_results[:10]: # Print each item print(freq) |
As you can likely tell by the first line #!/usr/bin/env python
, listing 1 in its entirety is a Python script. That said, place the contents of listing 1 in a file called laptop_mapreduce.py
to execute the MapReduce task like a regular Python script (e.g.python laptop_mapreduce.py
) and also ensure you have the source CSV file AviationData.csv
in the same directory.
After the first script line, we import three modules: mincemeat
for MapReduce operations, csv
to get data from a CSV file and operator
to sort MapReduce results. Next, you'll see the methods map_bigdata
and reduce_bigdata
which are the Map and Reduce functions through which the data will run -- I'll describe these methods shortly in the MapReduce workflow.
The csv.reader
statement is used to load data from a CSV file -- in this case the file named AviationData.csv
-- located in the same directory as the Python script. The CSV data is loaded as a Python list by the csv
module. Mincemeat expects a Python dictionary as it's source data, therefore we create a dictionary reference source_data
. Next, a loop is done on the entire contents (i.e. rows) of the CSV data, Python's enumerate
is used to gain access to the loop index. On each iteration, the index
variable contains the loop count and the row
variable an actual CSV record (e.g.20120423X32944, Accident, WPR12LA181, 04/22/2012, Monmouth.OR, United States, 44.841944
). On each iteration, the index is used as the key to the source_data
dictionary and a list with the fourth and fifth elements of each row
is used as the dictionary value.
After the previous sequence, the source_data
dictionary used as the source data for mincemeat has contents like the following: {1:['Monmouth.OR','United States'],2:['Toronto.Canada','Canada'],3:['Fairbanks.AK','United States]}
.
Next, we declare a mincemeat master server instance using mincemeat.Server()
and assign it to the reference s
. The s
reference is important because it's also used to assign source data, as well as the Map and Reduce methods applied to the data. The statement s.datasource = source_data
assigns the source_data
reference to the mincemeat master server, s.mapfn = map_bigdata
assigns map_bigdata
as the map method to the mincemeat master server and s.reducefn = reduce_bigdata
assigns reduce_bigdata
as the reduce method to the mincemeat master server.
Finally, the mincemeat master server is started using the statement s.run_server(password="bigdata")
and the output for the MapReduce task is sent to the results
reference. Next, lets explore the actual Map and Reduce methods.
The Map method called map_bigdata
receives two input parameters, k
for each dictionary key in the source data and v
for each dictionary value in the source data. Based on the source_data
dictionary, this means the map method is invoked multiple times with each dictionary element (e.g. map_bigdata(1,['Monmouth.OR','United States'])
,map_bigdata(2,['Toronto.Canada','Canada'])
,map_bigdata(3,['Fairbanks.AK','United States'])
). The map_bigdata
method then loops over each field value and if the value is an empty string skips to the next iteration. If the field is not empty, the yield
keyword is called with the field converted to upper case using the Python string method upper()
-- in this case either a city.state or country -- along with a 1 to indicate one occurrence of the field -- see the sidebar in case you're unfamiliar with the Python yield
keyword.
yield
keyword, generator methods and return
statement
By using the yield
keyword, a method becomes what is known as a Python generator. A generator is a method that produces a sequence of results instead of a single value. Python methods typically use the return
keyword to indicate the final result and termination of the method's logic. By using the yield
keyword, a method continues to accumulate results until its workflow (i.e. caller) is finished.
Given the way yield
works, it's particularly suited for how the MapReduce works, this fact will become more evident as I describe the MapReduce workflow in the following paragraphs.
The outcome of the map_bigdata
method is a Map of values generated from the source data. Given how the yield
keyword works -- see previous sidebar -- map_bigdata
is called recursively for every element in the source data, until it produces a sequence of values. This sequence of values or Map is then passed as the input to the Reduce method. Listing 2 illustrates the results of the map_bigdata
for a certain set of source data.
# Source data for mincemeat {1:['Monmouth.OR','United States'],2:['Toronto.Canada','Canada'],3:['Fairbanks.AK','United States'],4:['Janesville.WI','United States']} # Final output map after calling map_bigdata in Listing 1 with previous source data [('CANADA',[1]),('JANESVILLE.WI',[1]),('TORONTO.CANADA',[1]),('MONMOUTH.OR',[1]),('UNITED STATES',[1, 1, 1]),('FAIRBANKS.AK',[1])] |
Notice in listing 2 how the source data is converted when it passes through the map_bigdata
method. For example, the value United States
is converted to uppercase and for each occurrence in the source data a 1
is added to the list. Since Canada
occurs only once in the source data, the output appears as ('CANADA',[1])
. The reason behind the uppercase conversion and 1
is because the map_bigdata
uses yield field.upper(),1
and because yield
accumulate results, this causes a 1
to be added each time the field ocurrs.
Once the map_bigdata
method finishes, its results are passed individually to the Reduce method called reduce_bigdata
. Based on the output in listing 2, this means the reduce method is invoked multiple times with each element (e.g. reduce_bigdata('CANADA',[1]),reduce_bigdata('JANESVILLE.WI',[1]),reduce_bigdata('UNITED STATES',[1, 1, 1])
). Turning our attention back to listing 1, we can see the reduce_bigdata
method just returns the length of the its second argument, which in this case corresponds to the number of times each value (i.e. city.state or country) ocurrs.
After the reduce_bigdata
method is applied to each element in the Map, the MapReduce task finishes, with the results assigned to the results
reference. The final lines are standard Python sorting and looping sequences to get the top results of the results
MapReduce task. Listing 3 illustrates the execution of the mincemeat MapReduce Python script and its output.
$ python laptop_mapreduce.py # WAITS_FOR_MAP_REDUCE_WORKERS ('UNITED STATES', 68230) ('ANCHORAGE.AK', 483) ('ALBUQUERQUE.NM', 230) ('CHICAGO.IL', 228) ('HOUSTON.TX', 227) ('MIAMI.FL', 226) ('FAIRBANKS.AK', 204) ('CANADA', 181) ('BAHAMAS', 179) ('PHOENIX.AZ', 173) |
As illustrated in listing 3, the execution of the MapReduce script -- contained listing 1 -- is done as any other Python script. The output is a list with the 20 highest city.state or country counts found in the selected CSV file's columns. After invoking python laptop_mapreduce.py
you'll notice the Python script waits. In fact, it will wait endlessly until a MapReduce worker is activated. Listing 4 illustrates how to start a MapReduce worker with mincemeat.
mincemeat.py -p bigdata 127.0.0.1 |
The command in listing 4 starts a MapReduce worker on your laptop that connects to the master server at the I.P address 127.0.0.1
I.P address and uses the password bigdata
as a connection parameter. The I.P address 127.0.0.1
is the loop-back address and always points to the local machine, where as the password is hard-coded in the MapReduce script -- in listing 1 -- on the line s.run_server(password="bigdata")
. Basically listing 4 says: Start a MapReduce worker to work on the master server at 127.0.0.1 and use the bigdata
password to make the connection. Once the MapReduce worker is started, the logic in listing 3 proceeds and produces its output. I'll explain the reason behind this design of a MapReduce worker in the next section.
That's it, you've successfully run a MapReduce task on your laptop. Now that you've finished this MapReduce exercise, you might be left wondering what's so special about MapReduce ? After all, couldn't a regular SQL query against a database with the same data do the same thing ? Or a script -- written in a series of programming languages -- executed against a CSV file with the same data do the same thing too ? The short answer to both these questions is yes, but the longer answer is no. I didn't discuss two important features about MapReduce to make the concepts of Map and Reduce functions crystal clear, but now that you have a firm grasp of these functions, it's time to move on to MapReduce's distributed and parallel nature.
Keep in mind you can make use of any construct inside the Map and Reduce functions. In the previous example, I only used some of the input parameters and a single loop and conditional, but you can leverage any input parameters, loops, conditionals or other Python syntax. As far the mincemeat MapReduce implementation is concerned, the important thing is the source data be provided as a Python dictionary -- which is the input to the Map method -- and the result returned by the Map method is formatted to be the input for the Reduce method.
There's an important design concept described toward the end of the last exercise that makes MapReduce more powerful than what can be perceived from analyzing the MapReduce script in listing 1: The use of a MapReduce worker. MapReduce is designed to distribute its workload across multiple servers or MapReduce workers. In the previous exercise you only created a single MapReduce worker on the same machine as the master server, but it's also possible to create multiple MapReduce workers on different machines so the workload is done in parallel.
The ability to do parallel processing is what makes MapReduce a more powerful alternative for processing or grouping big data vs. doing the same task against data on a regular database or CSV file. And it's both the Map and Reduce functions that can be delegated to different workers to expedite the processing of the MapReduce task as a whole. However, in order to do parallel processing of both the Map and Reduce functions across multiple workers, you also add complexity to the system because it becomes distributed.
Nevertheless, one of the beauties of MapReduce is that you don't need to worry -- too much at least -- about provisioning your Map and Reduce functions to work in a distributed environment. As you saw in the last section, there wasn't anything specific to distributed or parallel processing in either the Map or Reduce functions, most of these details are taken care of by the MapReduce framework itself 'under the hood'. To give you an idea of the provisioning the MapReduce framework has to cover, table 2 contains what is known as the 8 fallacies of distributed computing.
|
These guiding principles in table 2 are fallacies because designers of distributed computing applications often assume they're true. In reality, if multiple servers are interacting with one another and you want to successfully achieve a common goal, you have to assume the antithesis of these principles: The network is not reliable, latency in never zero,...., the network won't be homogenous.
And it's precisely these last principles that MapReduce is designed to take care of: What if a MapReduce worker suddenly becomes unreachable ? What if a disk on a MapReduce worker suddenly fails ? What if a new MapReduce worker is added in the middle of a task ? All these questions have to be answered and fulfilled in order for a distributed system to work correctly. In the case of MapReduce, it's the master MapReduce server which distributes the workload to workers and keeps track of successful or failed tasks, where as the workers do the actual processing and maintain communication with the master to report progress.
In a previous paragraph, I mentioned how with MapReduce you don't need to worry 'too much' about provisions for parallel and distributing computing. Let me elaborate on the 'too much' remark. While MapReduce itself is designed so tasks don't require explicit knowledge or code for parallel or distributed environments, running a full-fledged MapReduce cluster with a master server and multiple workers can require a lot of supporting software to make MapReduce work correctly. This software can include: daemons to establish communication between servers, distributed file systems to share data between servers, monitoring services to detect load & health of workers, among other things.
It's due to this supporting software required alongside MapReduce that some MapReduce frameworks can take hours to install -- as I mentioned at the outset. Nevertheless, in order to truly achieve the full potential of MapReduce, it's necessary to deal with both the installation and configuration of MapReduce itself, as well as some of this other supporting software which is more in tune with system administration than actual application development.
Fortunately, with the emergence of cloud computing providers, the effort required to install and configure a full-fledged MapReduce cluster has dropped considerably. Many providers make this process as easy as uploading your big data sets and with a few clicks you can have a master server along with multiple workers running in a matter of minutes. And on top of the ease with which you can get started, there's also the fact that most providers offer a 'pay as you use' model where there's no need to make a strong upfront investment in hardware or software.
Daniel started working with technology fresh out of school for a Fortune 500 company, later on he moved on to doing contract work as an independent technology consultant. Throughout the years, he's continued to work at the code level -- as a developer and architect -- CTO-level advisor on platform selection strategies, professional technical writer and entrepreneur.
Daniel has been CTO/technical founder for sites that include: cleandatasets.com -- a data cleansing platform; videoformatter.com -- a video processing engine; and awardfun.com -- an award reference site for film, music and television. Daniel's writing experience spans multiple articles and books, with his early years focusing on Java, Linux and Open source topics and his more recent focus being Python/Django, Cloud computing and JavaScript topics.