{"id":1085,"date":"2015-01-15T08:57:34","date_gmt":"2015-01-15T13:57:34","guid":{"rendered":"https:\/\/blogs.mathworks.com\/loren\/?p=1085"},"modified":"2016-08-04T09:15:40","modified_gmt":"2016-08-04T14:15:40","slug":"process-big-data-in-matlab-with-mapreduce","status":"publish","type":"post","link":"https:\/\/blogs.mathworks.com\/loren\/2015\/01\/15\/process-big-data-in-matlab-with-mapreduce\/","title":{"rendered":"Process &#8220;Big Data&#8221; in MATLAB with mapreduce"},"content":{"rendered":"<div class=\"content\"><!--introduction--><p>Today I&#8217;d like to introduce guest blogger Ken Atwell who works for the MATLAB Development team here at MathWorks. Today, Ken will be discussing with you the <a href=\"https:\/\/www.mathworks.com\/help\/matlab\/mapreduce.html\">MapReduce<\/a> programming technique now available in the R2014b release of MATLAB. MapReduce provides a way to process large amounts of file-based data on a single computer in MATLAB. For very large data sets, the same MATLAB code written using MapReduce can also be run on the \"big data\" platform, Hadoop&reg;.<\/p><!--\/introduction--><h3>Contents<\/h3><div><ul><li><a href=\"#aeddf289-0ef0-4a14-9ba7-b4a2d4ed39c2\">About the data<\/a><\/li><li><a href=\"#131b2229-e114-4e5f-9926-458d3267f819\">Introduction to <tt>mapreduce<\/tt><\/a><\/li><li><a href=\"#cfa24854-7e63-42f1-b88a-d90207cd7f72\">Use <tt>mapreduce<\/tt> to perform a computation<\/a><\/li><li><a href=\"#67956932-c689-4835-aab0-13388a0e5a8b\">Create a datastore<\/a><\/li><li><a href=\"#fc43c52f-089d-405c-a446-e38011b0133f\">Create a map function<\/a><\/li><li><a href=\"#ea0b630b-b5b0-4783-9fe1-fee1bcd775dd\">Create a reduce function<\/a><\/li><li><a href=\"#0ecc7c98-5412-4215-a5c0-ab92fc2b3e7a\">Run <tt>mapreduce<\/tt><\/a><\/li><li><a href=\"#948da166-1d50-4663-a7aa-f908c27ff3c2\">Use of keys in <tt>mapreduce<\/tt><\/a><\/li><li><a href=\"#040c2eef-a0e5-4f51-86e1-67c2fb889022\">Calculating group-wise metrics with <tt>mapreduce<\/tt><\/a><\/li><li><a href=\"#78dee4ed-303c-44b9-a97d-89f9e4e789db\">Visualizing the results<\/a><\/li><li><a href=\"#fec857e9-b85b-4a84-b6e4-10ce031ee04d\">Running <tt>mapreduce<\/tt> on Hadoop<\/a><\/li><li><a href=\"#8c9f3c37-3f8b-455e-b98a-512965539de4\">Conclusion<\/a><\/li><\/ul><\/div><h4>About the data<a name=\"aeddf289-0ef0-4a14-9ba7-b4a2d4ed39c2\"><\/a><\/h4><p>The data set we will be using consists of records containing flight performance metrics for USA domestic airlines for the years 1987 through 2008. Each year consists of a separate file. If you have experimented with \"big data\" before, you may already be familiar with this data set.  The full data set can be downloaded from <a href=\"http:\/\/stat-computing.org\/dataexpo\/2009\/the-data.html\">here<\/a>. A small subset of the data set, airlinesmall.csv, is also included with MATLAB&reg; to allow you to run this and other examples without downloading the entire data set.<\/p><h4>Introduction to <tt>mapreduce<\/tt><a name=\"131b2229-e114-4e5f-9926-458d3267f819\"><\/a><\/h4><p>MapReduce is a programming technique that is used to \"divide and conquer\" big data problems.  In MATLAB, the <a href=\"https:\/\/www.mathworks.com\/help\/matlab\/ref\/mapreduce.html\"><tt>mapreduce<\/tt><\/a> function requires three input arguments:<\/p><div><ol><li>A <a href=\"https:\/\/www.mathworks.com\/help\/matlab\/ref\/datastore.html\"><tt>datastore<\/tt><\/a> for reading data into the \"map\" function in a chunk-wise fashion.<\/li><li>A \"map\" function that operates on the individual chunks of data. The output of the map function is a partial calculation.  <tt>mapreduce<\/tt> calls the map function one time for each chunk of data in the <tt>datastore<\/tt>, with each call operating independently from other map calls.<\/li><li>A \"reduce\" function that is given the aggregate outputs from the map function.  The reduce function finishes the computation begun by the map function, and outputs the final answer.<\/li><\/ol><\/div><p>This is an over-simplification to some extent, since the output of a call to the map function can be shuffled and combined in interesting ways before being passed to the reduce function.  This will be examined later.<\/p><h4>Use <tt>mapreduce<\/tt> to perform a computation<a name=\"cfa24854-7e63-42f1-b88a-d90207cd7f72\"><\/a><\/h4><p>Let's look at a straightforward example to illustrate how <tt>mapreduce<\/tt> is used. In this example we want to find the longest flight time out of all the flights recorded in the entire airline data set.  To do this we will:<\/p><div><ol><li>Create a <tt>datastore<\/tt> object to reference our airline data set<\/li><li>Create a \"map\" function that computes the maximum flight time in each chunk of data in the <tt>datastore<\/tt>.<\/li><li>Create a \"reduce\" function that computes the maximum value among all of the maxima computed by the calls to the map function.<\/li><\/ol><\/div><h4>Create a datastore<a name=\"67956932-c689-4835-aab0-13388a0e5a8b\"><\/a><\/h4><p><tt>datastore<\/tt> is used to access collections of tabular text files stored on a local disk, or the Hadoop&reg; Distributed File System (HDFS&#8482;). It is also the mechanism for providing data in a chunk-wise manner to the map calls when using <tt>mapreduce<\/tt>. <a href=\"https:\/\/blogs.mathworks.com\/loren\/2014\/12\/03\/reading-big-data-into-matlab\/\">A previous blog post<\/a> explained how <tt>datastore<\/tt> works and how it is used for reading collections of data that are too large to fit in memory.<\/p><p>Let's create our <tt>datastore<\/tt> and first preview the data set. This allows us to peek into the data set, and identify the format of the data and the columns that contain the data we are interested in. The preview will normally provide a small chunk of data that contains all the columns present in the data set, although I will only show a handful of columns to make our article more readable.<\/p><pre class=\"codeinput\">ds = datastore(<span class=\"string\">'airlinesmall.csv'<\/span>, <span class=\"string\">'DatastoreType'<\/span>, <span class=\"string\">'tabulartext'<\/span>, <span class=\"keyword\">...<\/span>\r\n    <span class=\"string\">'TreatAsMissing'<\/span>, <span class=\"string\">'NA'<\/span>);\r\ndata = preview(ds);\r\ndata(:,[1,2,3,9,12])\r\n<\/pre><pre class=\"codeoutput\">ans = \r\n    Year    Month    DayofMonth    UniqueCarrier    ActualElapsedTime\r\n    ____    _____    __________    _____________    _________________\r\n    1987    10       21            'PS'              53              \r\n    1987    10       26            'PS'              63              \r\n    1987    10       23            'PS'              83              \r\n    1987    10       23            'PS'              59              \r\n    1987    10       22            'PS'              77              \r\n    1987    10       28            'PS'              61              \r\n    1987    10        8            'PS'              84              \r\n    1987    10       10            'PS'             155              \r\n<\/pre><p>For the computation we are looking to perform, we only need to look at the \"ActualElapsedTime\" column: it contains the actual flight time. Let's configure our <tt>datastore<\/tt> to only provide this one column to our map function.<\/p><pre class=\"codeinput\">ds.SelectedVariableNames = {<span class=\"string\">'ActualElapsedTime'<\/span>};\r\n<\/pre><h4>Create a map function<a name=\"fc43c52f-089d-405c-a446-e38011b0133f\"><\/a><\/h4><p>Now we will write our map function (<tt>MaxTimeMapper.m<\/tt>). Three input arguments will be provided to our map function by <tt>mapreduce<\/tt>:<\/p><div><ol><li>The input data, \"ActualElapsedTime\", which is provided as a MATLAB <a href=\"https:\/\/www.mathworks.com\/help\/matlab\/ref\/table.html\"><tt>table<\/tt><\/a> by <tt>datastore<\/tt>.<\/li><li>A collection of configuration and contextual information, <tt>info<\/tt>. This can be ignored in most cases, as it is here.<\/li><li>An intermediate data storage object, where the results of the calculations from the map function are stored. Use the <a href=\"https:\/\/www.mathworks.com\/help\/matlab\/ref\/add.html\"><tt>add<\/tt><\/a> function to add key\/value pairs to this intermediate output. In this example, we have arbitrarily chosen the name of the key (<tt>'MaxElapsedTime'<\/tt>).<\/li><\/ol><\/div><p>Our map function finds the maximum value in the table <tt>'data'<\/tt>, and saves a single key (<tt>'MaxElapsedTime'<\/tt>) and corresponding value to the intermediate data storage object. We will now go ahead and save the following map function (<tt>MaxTimeMapper.m<\/tt>) to our current folder.<\/p><pre class=\"language-matlab\"><span class=\"keyword\">function<\/span> MaxTimeMapper(data, ~, intermediateValuesOut)\r\nmaxTime = max(data{:,:});\r\nadd(intermediateValuesOut, <span class=\"string\">'MaxElapsedTime'<\/span>, maxTime);\r\n<span class=\"keyword\">end<\/span>\r\n<\/pre><h4>Create a reduce function<a name=\"ea0b630b-b5b0-4783-9fe1-fee1bcd775dd\"><\/a><\/h4><p>Next, we create the reduce function (<tt>MaxTimeReducer.m<\/tt>). Three input arguments will also be provided to our reduce function by <tt>mapreduce<\/tt>:<\/p><div><ol><li>A set of input \"keys\".  Keys will be discussed further below, but they can be ignored in some simple problems, as they are here.<\/li><li>An intermediate data input object that <tt>mapreduce<\/tt> passes to the reduce function. This data is the output of the map function and is in the form of key\/value pairs. We will use the <a href=\"https:\/\/www.mathworks.com\/help\/matlab\/ref\/hasnext.html\"><tt>hasnext<\/tt><\/a> and <a href=\"https:\/\/www.mathworks.com\/help\/matlab\/ref\/getnext.html\"><tt>getnext<\/tt><\/a> functions to iterate through the values.<\/li><li>A final output data storage object where the results of the reduce calculations are stored. Use the <tt>add<\/tt> and <a href=\"https:\/\/www.mathworks.com\/help\/matlab\/ref\/addmulti.html\"><tt>addmulti<\/tt><\/a> functions to add key\/value pairs to the output.<\/li><\/ol><\/div><p>Our reduce function will receive a list of values associated with the <tt>'MaxElapsedTime'<\/tt> key which were generated by the calls to our map function. The reduce function will iterate through these values to find the maximum. We create the following reduce function (<tt>MaxTimeReducer.m<\/tt>) and save it to our current folder.<\/p><pre class=\"language-matlab\"><span class=\"keyword\">function<\/span> MaxTimeReducer(~, intermediateValuesIn, finalValuesOut)\r\nmaxElapsedTime = -inf;\r\n<span class=\"keyword\">while<\/span> hasnext(intermediateValuesIn)\r\n   maxElapsedTime = max(maxElapsedTime, getnext(intermediateValuesIn));\r\n<span class=\"keyword\">end<\/span>\r\nadd(finalValuesOut, <span class=\"string\">'MaxElapsedTime'<\/span>, maxElapsedTime);\r\n<span class=\"keyword\">end<\/span>\r\n<\/pre><h4>Run <tt>mapreduce<\/tt><a name=\"0ecc7c98-5412-4215-a5c0-ab92fc2b3e7a\"><\/a><\/h4><p>Once the map and reduce functions are written and saved in our current folder, we can call <tt>mapreduce<\/tt> and reference the <tt>datastore<\/tt>, map function, and reduce function to run our calculations on our data. The <a title=\"https:\/\/www.mathworks.com\/help\/matlab\/ref\/datastore.readall.html (link no longer works)\"><tt>readall<\/tt><\/a> function is used here to display the results of the MapReduce algorithm.<\/p><pre class=\"codeinput\">result = mapreduce(ds, @MaxTimeMapper, @MaxTimeReducer);\r\nreadall(result)\r\n<\/pre><pre class=\"codeoutput\">Parallel mapreduce execution on the local cluster:\r\n********************************\r\n*      MAPREDUCE PROGRESS      *\r\n********************************\r\nMap   0% Reduce   0%\r\nMap 100% Reduce 100%\r\nans = \r\n          Key           Value \r\n    ________________    ______\r\n    'MaxElapsedTime'    [1650]\r\n<\/pre><p>If Parallel Computing Toolbox is available, MATLAB will automatically start a pool and parallelize execution of the map functions. Since the number of calls to the map function by <tt>mapreduce<\/tt> corresponds to the number of chunks in the <tt>datastore<\/tt>, parallel execution of map calls may speed up overall execution.<\/p><h4>Use of keys in <tt>mapreduce<\/tt><a name=\"948da166-1d50-4663-a7aa-f908c27ff3c2\"><\/a><\/h4><p>The use of keys is an important feature of <tt>mapreduce<\/tt>. Each call to the map function can add intermediate results to one or more named \"buckets\", called keys.<\/p><p>If the map function adds values to multiple keys, this leads to multiple calls to the reduce function, with each reduce call working on only one key's intermediate values. The <tt>mapreduce<\/tt> function automatically manages this data movement between the map and reduce phases of the algorithm.<\/p><h4>Calculating group-wise metrics with <tt>mapreduce<\/tt><a name=\"040c2eef-a0e5-4f51-86e1-67c2fb889022\"><\/a><\/h4><p>The behavior of the map function in this case is more complex and will illustrate the benefits to using multiple keys to store your intermediate data. For every airline found in the input data, use the <tt>add<\/tt> function to add a vector of values.  This vector is a count of the number of flights for that carrier for each day in the 21+ years of data.  The carrier code is the key for this vector of values.  This ensures that all of the data for each carrier will be grouped together when <tt>mapreduce<\/tt> passes it to the reduce function.<\/p><p>Here's our new map function (<tt>CountFlightsMapper.m<\/tt>).<\/p><pre class=\"language-matlab\"><span class=\"keyword\">function<\/span> CountFlightsMapper(data, ~, intermediateValuesOut)\r\ndayNumber = days((datetime(data.Year, data.Month, data.DayofMonth) - datetime(1987,10,1)))+1;\r\ndaysSinceEpoch = days(datetime(2008,12,31) - datetime(19875,10,1))+1;\r\n<\/pre><pre class=\"language-matlab\">[airlineName, ~, airlineIndex] = unique(data.UniqueCarrier);\r\n<\/pre><pre class=\"language-matlab\"><span class=\"keyword\">for<\/span> i = 1:numel(airlineName)\r\n   dayTotals = accumarray(dayNumber(airlineIndex==i), 1, [daysSinceEpoch, 1]);\r\n   add(intermediateValuesOut, airlineName{i}, dayTotals);\r\n<span class=\"keyword\">end<\/span>\r\n<span class=\"keyword\">end<\/span>\r\n<\/pre><p>The reduce function is less complex.  It simply iterates over the intermediate values and adds the vectors together.  At completion, it outputs the values in this accumulated vector.  Note that the reduce function does not need to sort or examine the <tt>intemediateKeysIn<\/tt> values; each call to the reduce function by <tt>mapreduce<\/tt> only passes the values for one airline.<\/p><p>Here's our new reduce function (<tt>CountFlightsReducer.m<\/tt>).<\/p><pre class=\"language-matlab\"><span class=\"keyword\">function<\/span> CountFlightsReducer(intermediateKeysIn, intermediateValuesIn, finalValuesOut)\r\ndaysSinceEpoch = days(datetime(2008,12,31) - datetime(1987,10,1))+1;\r\ndayArray = zeros(daysSinceEpoch, 1);\r\n<\/pre><pre class=\"language-matlab\"><span class=\"keyword\">while<\/span> hasnext(intermediateValuesIn)\r\n   dayArray = dayArray + getnext(intermediateValuesIn);\r\n<span class=\"keyword\">end<\/span>\r\nadd(finalValuesOut, intermediateKeysIn, dayArray);\r\n<span class=\"keyword\">end<\/span>\r\n<\/pre><p>To run this new analysis, just reset the <tt>datastore<\/tt> and select the active variables of interest. In this case we want the date (year, month, day) and the airline carrier name. Once the map and reduce functions are written and saved in our current folder, <tt>mapreduce<\/tt> is called referencing the updated <tt>datastore<\/tt>, map function, and reduce function.<\/p><pre class=\"codeinput\">reset(ds);\r\nds.SelectedVariableNames = {<span class=\"string\">'Year'<\/span>, <span class=\"string\">'Month'<\/span>, <span class=\"string\">'DayofMonth'<\/span>, <span class=\"string\">'UniqueCarrier'<\/span>};\r\n\r\nresult = mapreduce(ds, @CountFlightsMapper, @CountFlightsReducer);\r\nresult = result.readall();\r\n<\/pre><pre class=\"codeoutput\">Parallel mapreduce execution on the local cluster:\r\n********************************\r\n*      MAPREDUCE PROGRESS      *\r\n********************************\r\nMap   0% Reduce   0%\r\nMap 100% Reduce  50%\r\nMap 100% Reduce 100%\r\n<\/pre><p>Up until this point we have been only analyzing the sample data set (airlinesmall.csv). To look at the number of flights each day over the full dataset, let's load the results of running our new MapReduce algorithm on the entire data set.<\/p><pre class=\"codeinput\">load <span class=\"string\">airlineResults<\/span>\r\n<\/pre><h4>Visualizing the results<a name=\"78dee4ed-303c-44b9-a97d-89f9e4e789db\"><\/a><\/h4><p>Before looking at the number of flights per day for the top 7 carriers, let us first apply a filter to the data to smooth out the effects of weekend travel.  This would otherwise clutter the visualization.<\/p><pre class=\"codeinput\">lines = result.Value;\r\nlines = horzcat(lines{:});\r\n[~,sortOrder] = sort(sum(lines), <span class=\"string\">'descend'<\/span>);\r\nlines = lines(:,sortOrder(1:7));\r\nresult = result(sortOrder(1:7),:);\r\n\r\nlines(lines==0) = nan;\r\n<span class=\"keyword\">for<\/span> carrier=1:size(lines,2)\r\n    lines(:,carrier) = filter(repmat(1\/7, [7 1]), 1, lines(:,carrier));\r\n<span class=\"keyword\">end<\/span>\r\n<\/pre><pre class=\"codeinput\">figure(<span class=\"string\">'Position'<\/span>,[1 1 800 800]);\r\nplot(datetime(1987,10,1):caldays(1):datetime(2008,12,31),lines)\r\ntitle (<span class=\"string\">'Domestic airline flights per day per carrier'<\/span>)\r\nxlabel(<span class=\"string\">'Date'<\/span>)\r\nylabel(<span class=\"string\">'Flights per day (7-day moving average)'<\/span>)\r\n\r\n<span class=\"keyword\">try<\/span>\r\n    carrierList = readtable(<span class=\"string\">'carriers.csv.txt'<\/span>, <span class=\"string\">'ReadRowNames'<\/span>, true);\r\n    result.Key = cellfun(@(x) carrierList(x, :).Description, result.Key);\r\n<span class=\"keyword\">catch<\/span>\r\n<span class=\"keyword\">end<\/span>\r\nlegend(result.Key, <span class=\"string\">'Location'<\/span>, <span class=\"string\">'SouthOutside'<\/span>)\r\n<\/pre><img decoding=\"async\" vspace=\"5\" hspace=\"5\" src=\"https:\/\/blogs.mathworks.com\/images\/loren\/2015\/airlineExample_01.png\" alt=\"\"> <p>The interesting characteristic highlighted by the plot is the growth of Southwest Airlines (WN) during this time period.<\/p><h4>Running <tt>mapreduce<\/tt> on Hadoop<a name=\"fec857e9-b85b-4a84-b6e4-10ce031ee04d\"><\/a><\/h4><p>The code we have just created was done so on our desktop computer and it allows us to analyze data that normally would not fit into the memory of our machine. But what if our data was stored on the \"big data\" platform, Hadoop&reg; and the data set is too large to reasonably transfer to our desktop computer?<\/p><p><a href=\"https:\/\/www.mathworks.com\/help\/distcomp\/run-mapreduce-on-a-hadoop-cluster.html\">Using Parallel Computing Toolbox along with MATLAB&reg; Distributed Computing Server&#8482;<\/a>, this same code can be run on a remote Hadoop cluster. The map and reduce functions would remain unchanged, but two configuration changes are required:<\/p><div><ol><li>Our <tt>datastore<\/tt> is updated to reference the location of the data files in the Hadoop&reg; Distributed File System (HDFS&#8482;).<\/li><li>The <a href=\"https:\/\/www.mathworks.com\/help\/matlab\/ref\/mapreducer.html\"><tt>mapreducer<\/tt><\/a> object is updated to reference Hadoop as the runtime environment.<\/li><\/ol><\/div><p>After these configuration changes, the algorithm can then be run on Hadoop.<\/p><p>MATLAB Compiler can be used to deploy applications created using MapReduce, and it can also generate Hadoop specific components for use in production settings.<\/p><h4>Conclusion<a name=\"8c9f3c37-3f8b-455e-b98a-512965539de4\"><\/a><\/h4><p>Learn more about MapReduce in MATLAB with our <a href=\"https:\/\/www.mathworks.com\/help\/matlab\/import_export\/build-effective-algorithms-with-mapreduce.html\">algorithm examples<\/a> and let us know how you might see MapReduce being used to analyze your big data <a href=\"https:\/\/blogs.mathworks.com\/loren\/?p=1085#respond\">here<\/a>.<\/p><script language=\"JavaScript\"> <!-- \r\n    function grabCode_26118cb2265044fe969e4693fdae97f3() {\r\n        \/\/ Remember the title so we can use it in the new page\r\n        title = document.title;\r\n\r\n        \/\/ Break up these strings so that their presence\r\n        \/\/ in the Javascript doesn't mess up the search for\r\n        \/\/ the MATLAB code.\r\n        t1='26118cb2265044fe969e4693fdae97f3 ' + '##### ' + 'SOURCE BEGIN' + ' #####';\r\n        t2='##### ' + 'SOURCE END' + ' #####' + ' 26118cb2265044fe969e4693fdae97f3';\r\n    \r\n        b=document.getElementsByTagName('body')[0];\r\n        i1=b.innerHTML.indexOf(t1)+t1.length;\r\n        i2=b.innerHTML.indexOf(t2);\r\n \r\n        code_string = b.innerHTML.substring(i1, i2);\r\n        code_string = code_string.replace(\/REPLACE_WITH_DASH_DASH\/g,'--');\r\n\r\n        \/\/ Use \/x3C\/g instead of the less-than character to avoid errors \r\n        \/\/ in the XML parser.\r\n        \/\/ Use '\\x26#60;' instead of '<' so that the XML parser\r\n        \/\/ doesn't go ahead and substitute the less-than character. \r\n        code_string = code_string.replace(\/\\x3C\/g, '\\x26#60;');\r\n\r\n        copyright = 'Copyright 2015 The MathWorks, Inc.';\r\n\r\n        w = window.open();\r\n        d = w.document;\r\n        d.write('<pre>\\n');\r\n        d.write(code_string);\r\n\r\n        \/\/ Add copyright line at the bottom if specified.\r\n        if (copyright.length > 0) {\r\n            d.writeln('');\r\n            d.writeln('%%');\r\n            if (copyright.length > 0) {\r\n                d.writeln('% _' + copyright + '_');\r\n            }\r\n        }\r\n\r\n        d.write('<\/pre>\\n');\r\n\r\n        d.title = title + ' (MATLAB code)';\r\n        d.close();\r\n    }   \r\n     --> <\/script><p style=\"text-align: right; font-size: xx-small; font-weight:lighter;   font-style: italic; color: gray\"><br><a href=\"javascript:grabCode_26118cb2265044fe969e4693fdae97f3()\"><span style=\"font-size: x-small;        font-style: italic;\">Get \r\n      the MATLAB code <noscript>(requires JavaScript)<\/noscript><\/span><\/a><br><br>\r\n      Published with MATLAB&reg; R2014b<br><\/p><\/div><!--\r\n26118cb2265044fe969e4693fdae97f3 ##### SOURCE BEGIN #####\r\n%% Process \"Big Data\" in MATLAB with |mapreduce|\r\n% Today I\u00e2\u20ac\u2122d like to introduce guest blogger Ken Atwell who works for the MATLAB\r\n% Development team here at MathWorks. Today, Ken will be discussing with you the\r\n% <https:\/\/www.mathworks.com\/help\/matlab\/mapreduce.html MapReduce>\r\n% programming technique now available in the R2014b release of MATLAB.\r\n% MapReduce provides a way to process large amounts of file-based\r\n% data on a single computer in MATLAB. For very large data sets, the same\r\n% MATLAB code written using MapReduce can also be run on the \"big data\"\r\n% platform, Hadoop(R).\r\n \r\n%% About the data\r\n% The data set we will be using consists of records containing flight\r\n% performance metrics for USA domestic airlines for the years 1987\r\n% through 2008. Each year consists of a separate file. If you have\r\n% experimented with \"big data\" before, you may already be familiar with\r\n% this data set.  The full data set can be downloaded from\r\n% <http:\/\/stat-computing.org\/dataexpo\/2009\/the-data.html here>. A small\r\n% subset of the data set, airlinesmall.csv, is also included with MATLAB(R) to allow you to\r\n% run this and other examples without downloading the entire data set.\r\n\r\n%% Introduction to |mapreduce|\r\n% MapReduce is a programming technique that is used to \"divide and conquer\" big data\r\n% problems.  In MATLAB, the <https:\/\/www.mathworks.com\/help\/matlab\/ref\/mapreduce.html |mapreduce|>\r\n% function requires three input arguments:\r\n%\r\n% # A <https:\/\/www.mathworks.com\/help\/matlab\/ref\/datastore.html |datastore|>\r\n% for reading data into the \"map\" function in a chunk-wise fashion.\r\n% # A \"map\" function that operates on the individual chunks of data.\r\n% The output of the map function is a partial calculation.  |mapreduce| calls\r\n% the map function one time for each chunk of data in the |datastore|, with each\r\n% call operating independently from other map calls.\r\n% # A \"reduce\" function that is given the aggregate outputs from the\r\n% map function.  The reduce function finishes the computation begun by\r\n% the map function, and outputs the final answer.\r\n%\r\n% This is an over-simplification to some extent, since the output of a call\r\n% to the map function can be shuffled and combined in interesting ways\r\n% before being passed to the reduce function.  This will be examined\r\n% later.\r\n\r\n%% Use |mapreduce| to perform a computation\r\n% Let's look at a straightforward example to illustrate how |mapreduce| is\r\n% used. In this example we want to find the longest flight time out of all\r\n% the flights recorded in the entire airline data set.  To do this we will:\r\n%\r\n% # Create a |datastore| object to reference our airline data set\r\n% # Create a \"map\" function that computes the maximum flight time in each\r\n% chunk of data in the |datastore|.\r\n% # Create a \"reduce\" function that computes the maximum value among all of the\r\n% maxima computed by the calls to the map function.\r\n\r\n%% Create a datastore\r\n% |datastore| is used to access collections of tabular text files stored\r\n% on a local disk, or the Hadoop(R) Distributed File System (HDFS(TM)). It\r\n% is also the mechanism for providing data in a chunk-wise manner to the\r\n% map calls when using |mapreduce|.\r\n% <https:\/\/blogs.mathworks.com\/loren\/2014\/12\/03\/reading-big-data-into-matlab\/ A previous blog post>\r\n% explained how |datastore| works and how it is used for reading collections\r\n% of data that are too large to fit in memory.\r\n%\r\n% Let's create our |datastore| and first preview the data set. This allows us\r\n% to peek into the data set, and identify the format of the data and the\r\n% columns that contain the data we are interested in. The preview will\r\n% normally provide a small chunk of data that contains all the columns\r\n% present in the data set, although I will only show a handful of columns to\r\n% make our article more readable.\r\nds = datastore('airlinesmall.csv', 'DatastoreType', 'tabulartext', ...\r\n    'TreatAsMissing', 'NA');\r\ndata = preview(ds);\r\ndata(:,[1,2,3,9,12])\r\n\r\n%%\r\n% For the computation we are looking to perform, we only need to look at\r\n% the \"ActualElapsedTime\" column: it contains the actual flight time.\r\n% Let's configure our |datastore| to only provide this one column to our map\r\n% function.\r\nds.SelectedVariableNames = {'ActualElapsedTime'};\r\n\r\n%% Create a map function\r\n% Now we will write our map function (|MaxTimeMapper.m|). Three input\r\n% arguments will be provided to our map function by |mapreduce|:\r\n%\r\n% # The input data, \"ActualElapsedTime\", which is provided as a MATLAB\r\n% <https:\/\/www.mathworks.com\/help\/matlab\/ref\/table.html |table|> by |datastore|.\r\n% # A collection of configuration and contextual information, |info|. This\r\n% can be ignored in most cases, as it is here.\r\n% # An intermediate data storage object, where the results of the\r\n% calculations from the map function are stored. Use the <https:\/\/www.mathworks.com\/help\/matlab\/ref\/add.html |add|>\r\n% function to add key\/value pairs to this intermediate output. In this example,\r\n% we have arbitrarily chosen the name of the key (|'MaxElapsedTime'|).\r\n% \r\n% Our map function finds the maximum value in the table |'data'|,\r\n% and saves a single key (|'MaxElapsedTime'|) and corresponding value to\r\n% the intermediate data storage object.\r\n% We will now go ahead and save the following map function\r\n% (|MaxTimeMapper.m|) to our current folder.\r\n%\r\n%   function MaxTimeMapper(data, ~, intermediateValuesOut)\r\n%   maxTime = max(data{:,:});\r\n%   add(intermediateValuesOut, 'MaxElapsedTime', maxTime);\r\n%   end\r\n%\r\n\r\n%% Create a reduce function\r\n% Next, we create the reduce function (|MaxTimeReducer.m|). Three input\r\n% arguments will also be provided to our reduce function by |mapreduce|:\r\n%\r\n% # A set of input \"keys\".  Keys will be discussed further below, but they\r\n% can be ignored in some simple problems, as they are here.\r\n% # An intermediate data input object that |mapreduce| passes to the reduce\r\n% function. This data is the output of the map function and\r\n% is in the form of key\/value pairs. We will use the <https:\/\/www.mathworks.com\/help\/matlab\/ref\/hasnext.html |hasnext|>\r\n% and <https:\/\/www.mathworks.com\/help\/matlab\/ref\/getnext.html |getnext|>\r\n% functions to iterate through the values.\r\n% # A final output data storage object where the results of the reduce\r\n% calculations are stored. Use the |add| and <https:\/\/www.mathworks.com\/help\/matlab\/ref\/addmulti.html |addmulti|>\r\n% functions to add key\/value pairs to the output.\r\n%\r\n% Our reduce function will receive a list of values associated with the\r\n% |'MaxElapsedTime'| key which were generated by the calls to our map\r\n% function. The reduce function will iterate through these values to find\r\n% the maximum.\r\n% We create the following reduce function (|MaxTimeReducer.m|) and save it to\r\n% our current folder.\r\n%\r\n%   function MaxTimeReducer(~, intermediateValuesIn, finalValuesOut)\r\n%   maxElapsedTime = -inf;\r\n%   while hasnext(intermediateValuesIn)\r\n%      maxElapsedTime = max(maxElapsedTime, getnext(intermediateValuesIn));\r\n%   end\r\n%   add(finalValuesOut, 'MaxElapsedTime', maxElapsedTime);\r\n%   end\r\n%\r\n\r\n%% Run |mapreduce|\r\n% Once the map and reduce functions are written and saved in our\r\n% current folder, we can call |mapreduce| and reference the |datastore|, map\r\n% function, and reduce function to run our calculations on our data. \r\n% The <https:\/\/www.mathworks.com\/help\/matlab\/ref\/datastore.readall.html |readall|>\r\n% function is used here to display the results of the MapReduce algorithm.\r\nresult = mapreduce(ds, @MaxTimeMapper, @MaxTimeReducer);\r\nreadall(result)\r\n\r\n%%\r\n% If <https:\/\/www.mathworks.com\/help\/distcomp\/run-mapreduce-on-a-local-cluster.html Parallel Computing Toolbox>\r\n% is available, MATLAB will automatically start a pool and parallelize execution of the map\r\n% functions. Since the number of calls to the map function by |mapreduce|\r\n% corresponds to the number of chunks in the |datastore|, parallel execution\r\n% of map calls may speed up overall execution.\r\n\r\n%% Use of keys in |mapreduce|\r\n% The use of keys is an important feature of |mapreduce|. Each\r\n% call to the map function can add intermediate results to one or more\r\n% named \"buckets\", called keys.\r\n%\r\n% If the map function adds values to multiple keys, this leads to\r\n% multiple calls to the reduce function, with each reduce call working on\r\n% only one key's intermediate values. The |mapreduce| function automatically\r\n% manages this data movement between the map and reduce phases of the\r\n% algorithm.\r\n%\r\n\r\n%% Calculating group-wise metrics with |mapreduce|\r\n% The behavior of the map function in this case is more complex and\r\n% will illustrate the benefits to using multiple keys to store your\r\n% intermediate data.\r\n% For every airline found in the input data, use the |add| function\r\n% to add a vector of values.  This vector is a count of the number of\r\n% flights for that carrier for each day in the 21+ years of data.  The\r\n% carrier code is the key for this vector of values.  This ensures that all\r\n% of the data for each carrier will be grouped together when |mapreduce|\r\n% passes it to the reduce function.\r\n%\r\n% Here's our new map function (|CountFlightsMapper.m|).\r\n%\r\n%   function CountFlightsMapper(data, ~, intermediateValuesOut)\r\n%   dayNumber = days((datetime(data.Year, data.Month, data.DayofMonth) - datetime(1987,10,1)))+1;\r\n%   daysSinceEpoch = days(datetime(2008,12,31) - datetime(19875,10,1))+1;\r\n%   \r\n%   [airlineName, ~, airlineIndex] = unique(data.UniqueCarrier);\r\n%   \r\n%   for i = 1:numel(airlineName)\r\n%      dayTotals = accumarray(dayNumber(airlineIndex==i), 1, [daysSinceEpoch, 1]);\r\n%      add(intermediateValuesOut, airlineName{i}, dayTotals);\r\n%   end\r\n%   end\r\n%\r\n\r\n\r\n%%\r\n% The reduce function is less complex.  It simply iterates over the\r\n% intermediate values and adds the vectors together.  At completion, it\r\n% outputs the values in this accumulated vector.  Note that the reduce\r\n% function does not need to sort or examine the |intemediateKeysIn| values;\r\n% each call to the reduce function by |mapreduce| only passes the values\r\n% for one airline.\r\n%\r\n% Here's our new reduce function (|CountFlightsReducer.m|).\r\n%\r\n%   function CountFlightsReducer(intermediateKeysIn, intermediateValuesIn, finalValuesOut)\r\n%   daysSinceEpoch = days(datetime(2008,12,31) - datetime(1987,10,1))+1;\r\n%   dayArray = zeros(daysSinceEpoch, 1);\r\n% \r\n%   while hasnext(intermediateValuesIn)\r\n%      dayArray = dayArray + getnext(intermediateValuesIn);\r\n%   end\r\n%   add(finalValuesOut, intermediateKeysIn, dayArray);\r\n%   end\r\n%\r\n\r\n%%\r\n% To run this new analysis, just reset the |datastore| and select the\r\n% active variables of interest. In this case we want the date (year, month, day)\r\n% and the airline carrier name. Once the map and reduce functions are\r\n% written and saved in our current folder, |mapreduce| is called\r\n% referencing the updated |datastore|, map function, and reduce function.\r\n\r\nreset(ds);\r\nds.SelectedVariableNames = {'Year', 'Month', 'DayofMonth', 'UniqueCarrier'};\r\n\r\nresult = mapreduce(ds, @CountFlightsMapper, @CountFlightsReducer);\r\nresult = result.readall();\r\n\r\n%%\r\n% Up until this point we have been only analyzing the sample data set\r\n% (airlinesmall.csv). To look at the number of flights each day over the\r\n% full dataset, let's load the results of running our new MapReduce\r\n% algorithm on the entire data set.\r\nload airlineResults\r\n\r\n%% Visualizing the results\r\n% Before looking at the number of flights per day for the top 7 carriers,\r\n% let us first apply a filter to the data to smooth out the effects of\r\n% weekend travel.  This would otherwise clutter the visualization.\r\n\r\nlines = result.Value;\r\nlines = horzcat(lines{:});\r\n[~,sortOrder] = sort(sum(lines), 'descend');\r\nlines = lines(:,sortOrder(1:7));\r\nresult = result(sortOrder(1:7),:);\r\n\r\nlines(lines==0) = nan;\r\nfor carrier=1:size(lines,2)\r\n    lines(:,carrier) = filter(repmat(1\/7, [7 1]), 1, lines(:,carrier));\r\nend\r\n\r\n%%\r\n% \r\nfigure('Position',[1 1 800 800]);\r\nplot(datetime(1987,10,1):caldays(1):datetime(2008,12,31),lines)\r\ntitle ('Domestic airline flights per day per carrier')\r\nxlabel('Date')\r\nylabel('Flights per day (7-day moving average)')\r\n\r\ntry\r\n    carrierList = readtable('carriers.csv.txt', 'ReadRowNames', true);\r\n    result.Key = cellfun(@(x) carrierList(x, :).Description, result.Key);\r\ncatch\r\nend\r\nlegend(result.Key, 'Location', 'SouthOutside')\r\n\r\n%%\r\n% The interesting characteristic highlighted by the plot is the growth of\r\n% Southwest Airlines (WN) during this time period.\r\n\r\n%% Running |mapreduce| on Hadoop\r\n% The code we have just created was done so on our desktop computer and it\r\n% allows us to analyze data that normally would not fit into the memory of\r\n% our machine. But what if our data was stored on the \"big data\" platform,\r\n% Hadoop(R) and the data set is too large to reasonably transfer to our\r\n% desktop computer?\r\n%\r\n% <https:\/\/www.mathworks.com\/help\/distcomp\/run-mapreduce-on-a-hadoop-cluster.html Using Parallel Computing Toolbox along with MATLAB(R) Distributed\r\n% Computing Server(TM)>, this same code can be run on a remote Hadoop cluster.\r\n% The map and reduce functions would remain unchanged, but two\r\n% configuration changes are required:\r\n%\r\n% # Our |datastore| is updated to reference the location of the data files\r\n% in the Hadoop(R) Distributed File System (HDFS(TM)).\r\n% # The <https:\/\/www.mathworks.com\/help\/matlab\/ref\/mapreducer.html |mapreducer|>\r\n% object is updated to reference Hadoop as the runtime environment.\r\n%\r\n% After these configuration changes, the algorithm can then be run on Hadoop.\r\n\r\n%%\r\n% MATLAB Compiler can be used to deploy applications created using\r\n% MapReduce, and it can also generate Hadoop specific components for use\r\n% in production settings.\r\n\r\n%% Conclusion\r\n% Learn more about MapReduce in MATLAB with our\r\n% <https:\/\/www.mathworks.com\/help\/matlab\/import_export\/build-effective-algorithms-with-mapreduce.html\r\n% algorithm examples> and let us know how you might see MapReduce being\r\n% used to analyze your big data\r\n% <https:\/\/blogs.mathworks.com\/loren\/?p=1085#respond here>.\r\n%\r\n\r\n\r\n\r\n\r\n\r\n##### SOURCE END ##### 26118cb2265044fe969e4693fdae97f3\r\n-->","protected":false},"excerpt":{"rendered":"<div class=\"overview-image\"><img decoding=\"async\"  class=\"img-responsive\" src=\"https:\/\/blogs.mathworks.com\/images\/loren\/2015\/airlineExample_01.png\" onError=\"this.style.display ='none';\" \/><\/div><!--introduction--><p>Today I&#8217;d like to introduce guest blogger Ken Atwell who works for the MATLAB Development team here at MathWorks. Today, Ken will be discussing with you the <a href=\"https:\/\/www.mathworks.com\/help\/matlab\/mapreduce.html\">MapReduce<\/a> programming technique now available in the R2014b release of MATLAB. MapReduce provides a way to process large amounts of file-based data on a single computer in MATLAB. For very large data sets, the same MATLAB code written using MapReduce can also be run on the \"big data\" platform, Hadoop&reg;.... <a class=\"read-more\" href=\"https:\/\/blogs.mathworks.com\/loren\/2015\/01\/15\/process-big-data-in-matlab-with-mapreduce\/\">read more >><\/a><\/p>","protected":false},"author":39,"featured_media":0,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":[],"categories":[63,10,45,6],"tags":[],"_links":{"self":[{"href":"https:\/\/blogs.mathworks.com\/loren\/wp-json\/wp\/v2\/posts\/1085"}],"collection":[{"href":"https:\/\/blogs.mathworks.com\/loren\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/blogs.mathworks.com\/loren\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/blogs.mathworks.com\/loren\/wp-json\/wp\/v2\/users\/39"}],"replies":[{"embeddable":true,"href":"https:\/\/blogs.mathworks.com\/loren\/wp-json\/wp\/v2\/comments?post=1085"}],"version-history":[{"count":3,"href":"https:\/\/blogs.mathworks.com\/loren\/wp-json\/wp\/v2\/posts\/1085\/revisions"}],"predecessor-version":[{"id":1967,"href":"https:\/\/blogs.mathworks.com\/loren\/wp-json\/wp\/v2\/posts\/1085\/revisions\/1967"}],"wp:attachment":[{"href":"https:\/\/blogs.mathworks.com\/loren\/wp-json\/wp\/v2\/media?parent=1085"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/blogs.mathworks.com\/loren\/wp-json\/wp\/v2\/categories?post=1085"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/blogs.mathworks.com\/loren\/wp-json\/wp\/v2\/tags?post=1085"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}