Scaling Market Basket Analysis with MapReduce
In an earlier post, today's guest blogger Toshi Takeuchi gave us an introduction to Market Basket Analysis. This week, he will discuss how to scale this technique using MapReduce to deal with larger data.
Contents
MapReduce in MATLAB 101
R2014b was a major update to MATLAB core functionality and one of the several new exciting features to me was MapReduce. I was primarily interested in Market Basket Analysis to analyze clickstream data and I knew web usage data extracted from web server logs would be very large.
MapReduce was developed to process massive datasets in a distributed parallel computation, and it is one of the key technologies that enabled Big Data analytics.
MapReduce is made up of mappers and reducers. Mappers read data from file storage one chunk at a time and parse data to generate key-value pairs. Then reducers will receive those key-value pairs by key and process values associated with those values. Therefore what you need to do is:
- Use datastore to designate data sources
- Define mapper and reducer functions
- Use mapreduce with datastore, the mapper and reducer to process data
- Store the processed data for further analysis
Though mappers and reducers perform fairly simple operations, you can chain them together to handle more complex operations. In the Apriori algorithm, the most time-consuming steps are the generation of transactions and 1-itemset data. So let's use MapReduce to address these bottlenecks.
We start by setting up datastore. In this example, we are going to use a fairly small CSV file on a local drive, but datastore can
- read a data that is too large to fit in a memory in a single computer, or
- read files on multiple locations on a cluster, including those on a Hadoop distributed file system (HDFS), with appropriate add-on products.
It is important to start out with a small subset of the data to prototype and test your algorithm before you use it on really big data. MATLAB makes it really easy to prototype your algorithm on your local machine and then scale it up to the cluster or cloud later.
Set up source datastore.
source_ds = datastore('sampleData.csv','ReadVariableNames',false,... 'NumHeaderLines',1,'VariableNames',{'VisitorCookieID','Page','Visits'}); source_ds.SelectedVariableNames = {'VisitorCookieID','Page'};
The data is a list of Visitor Cookie IDs and the pages associated with the IDs.
Let's review the data.
disp(preview(source_ds)) reset(source_ds)
VisitorCookieID Page __________________________________ ___________________________________________________________________________ '3821337fdad7a6132253b10b602c4616' '/matlabcentral/answers/' '3821337fdad7a6132253b10b602c4616' '/matlabcentral/answers/152931-how-to-translate-the-following-code-from...' '3821337fdad7a6132253b10b602c4616' '/matlabcentral/answers/153109-number-greater-than-the-largest-positive...' '3821337fdad7a6132253b10b602c4616' '/help/matlab/ref/realmax.html' '3821337fdad7a6132253b10b602c4616' '/matlabcentral/answers/153201-how-to-evaluate-large-factorials' '3821337fdad7a6132253b10b602c4616' '/help/matlab/matlab_prog/floating-point-numbers.html' '3821337fdad7a6132253b10b602c4616' '/matlabcentral/answers/contributors/5560534-tigo/questions' '3821337fdad7a6132253b10b602c4616' '/matlabcentral/newsreader/view_thread/297433'
Step 1: Group items by transaction
If you think of a visitor as a shopper, you can think of pages visited as items in the shopping cart (a transaction). A visitor can visit the same page multiple times, but such repeated visits are not factored in itemset counting.
One of the important considerations in designing a MapReduce algorithm is to minimize the number of keys you generate. For this reason, a good starting point would be to group items by transactions by using VisitorCookieID as the key, because we have a finite set of visitors but they can visit a larger number of pages.
type transactionMapper
function transactionMapper(data, info, intermKVStore) tid = data.VisitorCookieID; item = data.Page; % get unique tids u_tid = unique(tid); % iterate over the data chunk to map multiple items to a unique tid items = cell(size(u_tid)); for i = 1:length(tid) row = ismember(u_tid,tid{i}); items{row}{end+1} = item{i}; end % use addmulti to speed up the process addmulti(intermKVStore, u_tid, items) end
The mapper will then pass this key-value pair to the reducer.
type transactionReducer
function transactionReducer(key, value, outKVStore) items = {}; % concatenate items from different mappers for the same key while hasnext(value) items = [items, getnext(value)]; end % eliminate duplicates u_items = unique(items); % save the data to a key-value store add(outKVStore, key, u_items); end
The reducer receives key-value pairs by key, and merges multiple cell arrays with the same key into a single cell array and removes any duplicates. We can then store the result in a new datastore.
Now let's run this job.
Group items by transaction.
transaction_ds = mapreduce(source_ds, @transactionMapper, @transactionReducer); transactions = readall(transaction_ds); disp(transactions(1:5,:))
Parallel mapreduce execution on the local cluster: ******************************** * MAPREDUCE PROGRESS * ******************************** Map 0% Reduce 0% Map 100% Reduce 50% Map 100% Reduce 100% Key Value __________________________________ ___________ '00927996b5566e6347e55463dc7c6273' {1x16 cell} '01c5f379d2e475d727eec2c711fb83f8' {1x11 cell} '0717615157c000c4955c21b958b7866d' {1x1 cell} '0f29597bff2a611013a4333f027b4f1a' {1x12 cell} '13027f74a9c7402bf7b8f699b557815f' {1x12 cell}
Step 2: Generate 1-itemsets
We now know that all items in a transaction are unique. All we need to do is to count the number of times an item appears in the transactions to see how many transactions contained that item. The pages were stored as a value in the previous step, so we simply need to retrieve just those and count their contents.
type oneItemsetMapper
function oneItemsetMapper(data, info, intermKVStore) % keys are in a cell array keys = data.Value{1}; % create a cell array of 1's values = num2cell(ones(size(keys))); % save the data to a key-value store addmulti(intermKVStore, keys, values) end
The mapper passes each instance of a page as 1.
type oneItemsetReducer
function oneItemsetReducer(key, value, outKVStore) count = 0; while hasnext(value) count = count + getnext(value); end add(outKVStore, key, count); end
The reducer then collects the counts of a given page and sums them. Now let's run this job and read the completed results into memory.
% Get 1-itemsets oneItemset_ds = mapreduce(transaction_ds, @oneItemsetMapper, @oneItemsetReducer); % Read the result into memory oneItemsets = readall(oneItemset_ds); disp(oneItemsets(655:659,:))
Parallel mapreduce execution on the local cluster: ******************************** * MAPREDUCE PROGRESS * ******************************** Map 0% Reduce 0% Map 50% Reduce 0% Map 100% Reduce 50% Map 100% Reduce 100% Key Value __________________________________________________________________ _____ '/loren/' [2] '/loren/2006/07/05/when-is-a-numeric-result-not-a-number/' [1] '/loren/2009/10/02/using-parfor-loops-getting-up-and-running/' [1] '/loren/2011/11/14/generating-c-code-from-your-matlab-algorithms/' [1] '/loren/2012/02/06/using-gpus-in-matlab/' [1]
Generate Frequent Itemsets
Now we are ready to feed the transactions and oneItemsets data to findFreqItemsets, which also accepts a table of 1-itemsets as an optional input. The code is available if you go to the earlier post.
Let's generate frequent itemsets based on a minimum support threshold 0.02, which means we see the same pattern among at least 2% of the visitors.
minSup = 0.02; fprintf('Processing dataset with minimum support threshold = %.2f\n...\n', minSup) [F,S,items] = findFreqItemsets(transactions,minSup,oneItemsets); fprintf('Frequent Itemsets Found: %d\n', sum(arrayfun(@(x) size(x.freqSets,1), F))) fprintf('Max Level : k = %d\n', length(F)) fprintf('Number of Support Data : %d\n\n', length(S))
Processing dataset with minimum support threshold = 0.02 ... Frequent Itemsets Found: 151 Max Level : k = 4 Number of Support Data : 4107
Generate Rules
This step is no different from the example in the earlier post. Let's use a minimum confidence threshold 0.6.
minConf = 0.6; rules = generateRules(F,S,minConf); fprintf('Minimum Confidence : %.2f\n', minConf) fprintf('Rules Found : %d\n\n', length(rules))
Minimum Confidence : 0.60 Rules Found : 99
Visualize rules by support, confidence, and lift
Now we can visualize the rules we generated. This sample dataset is very tiny, and the number of rules are limited.
conf = arrayfun(@(x) x.Conf, rules); % get conf as a vector lift = arrayfun(@(x) x.Lift, rules); % get lift as a vector sup = arrayfun(@(x) x.Sup, rules); % get support as a vector colormap cool scatter(sup,conf,lift*5, lift, 'filled') xlabel('Support'); ylabel('Confidence') t = colorbar('peer',gca); set(get(t,'ylabel'),'String', 'Lift'); title('Sample Data')
Microsoft Web Data
Unfortunately we can’t share this sample data for you to try, but it is easy to adapt this code using publicly available dataset such as Anonymous Microsoft Web Data Data Set from UCI Machine Learning Repository. This data is preprossed from raw clickstream logs, and we need to process it back to the raw format we used in sample data. You can see the code used to process this dataset below.
When you apply Market Basket Analysis to this dataset, you should get something like this.
Rule #1 {Windows 95} => {Windows Family of OSs} Conf: 0.91, Lift: 6.46
Rule #2 {Windows95 Support} => {isapi} Conf: 0.84, Lift: 5.16
Rule #3 {SiteBuilder Network Membership} => {Internet Site Construction for Developers} Conf: 0.80, Lift: 8.17
Rule #5 {Knowledge Base, isapi} => {Support Desktop} Conf: 0.71, Lift: 5.21
Rule #18 {Windows Family of OSs, isapi} => {Windows95 Support} Conf: 0.64, Lift: 11.66
The sample code also includes MapReduce steps on a reduced dataset. I reduced the dataset because my MapReduce code is not optimal for this dataset and runs extremely slowly. My code assumes that there are more pages than visitors, and we have more visitors than pages in this Microsoft dataset.
If you would like to use MapReduce on the full Microsoft dataset, I strongly recommend that you wrote your own MapReduce code that is more optimized for this dataset.
Summary and the Next Step
Now we prototyped the algorithm and tested it. I can see if I am going to get the insight I am looking for with this process. Once I am happy with it, I still need to figure out where to store the larger data on a cluster or cloud. It's more of a business challenge than a technical one, and I am still in the middle of that.
Programming MapReduce in MATLAB is very straight forward and easy. The source data was local in this example, but you can also use it for a larger dataset that sits across multiple locations, such as HDFS files. You can prototype your MapReduce algorithm locally, then change the configuration to scale up to a larger dataset.
In this example, MapReduce was used only for the initial data processing, and the rest was still done in memory. As long as the processed result can fit in the memory, we can use this approach.
However, if the processed data gets larger, then we need to make more use of MapReduce in other steps in the Apriori alogrithm.
The key is to adapt the algorithm to parallel processing. In the current incarnation, you have a single thread in the candidate pruning stage.
Instead, we subdivide the dataset into several chunks and complete the entire process through rule generation in each. We need to adjust the minimum support to account for the reduction of transaction counts in subsets if we do so. Then we can combine the final output. This may not provide the same result as the single thread process, but it should be fairly close.
Your thoughts?
Do you see ways in which you might leverage MapReduce to handle larger data analysis projects? Let us know here.
Appendix - Process Microsoft Web Data
Here is the code I used to process the Microsoft dataset.
type processMicrosoftWebData.m
%% Let's load the dataset % The UCI dataset is not in the raw log format, but in a special % non-tabular ASCII format. We need to process the data into a format we % can use. This is not a typical process you do when you work with actual % raw clickstream data. % clear everything clearvars; close all; clc % the URL of the source data filename = 'anonymous-msweb.data'; % if te file doesn't exist, download of the UCI website if exist(filename,'file') == 0 url = 'http://archive.ics.uci.edu/ml/machine-learning-databases/anonymous/anonymous-msweb.data'; filepath = websave('anonymous-msweb.data',url); end % accumulators aid = []; % attribute id vroot = {}; % vroot - page name url = {}; % url of the page vid = []; % visitor id visits = []; % aid of vroots visited by a visitor VisitorCookieID = {}; % same as vid but as string % open the file fid = fopen(filename); % read the first line tline = fgetl(fid); % if the line contains a string while ischar(tline) % if the line contains attribute if strcmp(tline(1),'A') c = textscan(tline,'%*s%d%*d%q%q','Delimiter',','); aid = [aid;c{1}]; vroot = [vroot;c{2}]; url = [url;c{3}]; % if the line contains case elseif strcmp(tline(1),'C') user = textscan(tline,'%*s%*q%d','Delimiter',','); % if the line contains vote elseif strcmp(tline(1),'V') vid = [vid;user{1}]; vote = textscan(tline,'%*s%d%*d','Delimiter',','); visits = [visits; vote{1}]; VisitorCookieID = [VisitorCookieID;['id',num2str(user{1})]]; end tline = fgetl(fid); end % close the file fclose(fid); % sort attributes by aid [~,idx] = sort(aid); aid = aid(idx); vroot = vroot(idx); url = url(idx); % populate |Page| with vroot based on aid Page = cell(size(visits)); for i = 1:length(aid) Page(visits == aid(i)) = vroot(i); end % create table and write it to disk if it doesn't exist transactions = table(VisitorCookieID,Page); if exist('msweb.transactions.csv','file') == 0 % just keep the first 318 rows to use as sample data transactions(319:end,:) = []; % comment out to keep the whole thing writetable(transactions,'msweb.transactions.csv') end %% Association Rule Mining without MapReduce % Since we already have all necessary pieces of data in the workspace, we % might as well do the analysis now. % get unique uid uniq_uid = unique(vid); % create a cell array of visits that contains vroots visited transactions = cell(size(uniq_uid)); for i = 1:length(uniq_uid) transactions(i) = {visits(vid == uniq_uid(i))'}; end % find frequent itemsets of vroots from the visits minSup = 0.02; fprintf('Processing dataset with minimum support threshold = %.2f\n...\n', minSup) [F,S,items] = findFreqItemsets(transactions,minSup); fprintf('Frequent Itemsets Found: %d\n', sum(arrayfun(@(x) size(x.freqSets,1), F))) fprintf('Max Level : k = %d\n', length(F)) fprintf('Number of Support Data : %d\n\n', length(S)) % generate rules minConf = 0.6; rules = generateRules(F,S,minConf); fprintf('Minimum Confidence : %.2f\n', minConf) fprintf('Rules Found : %d\n\n', length(rules)) % plot the rules conf = arrayfun(@(x) x.Conf, rules); % get conf as a vector lift = arrayfun(@(x) x.Lift, rules); % get lift as a vector sup = arrayfun(@(x) x.Sup, rules); % get support as a vector colormap cool scatter(sup,conf,lift*5, lift, 'filled') xlabel('Support'); ylabel('Confidence') t = colorbar('peer',gca); set(get(t,'ylabel'),'String', 'Lift'); title('Microsoft Web Data') % display the selected rules selected = [1,2,3,5,18]; for i = 1:length(selected) fprintf('Rule #%d\n', selected(i)) lenAnte = length(rules(selected(i)).Ante); if lenAnte == 1 fprintf('{%s} => {%s}\nConf: %.2f, Lift: %.2f\n\n',... vroot{rules(selected(i)).Ante(1)},vroot{rules(selected(i)).Conseq},... rules(selected(i)).Conf,rules(selected(i)).Lift) elseif lenAnte == 2 fprintf('{%s, %s} => {%s}\nConf: %.2f, Lift: %.2f\n\n',... vroot{rules(selected(i)).Ante(1)},vroot{rules(selected(i)).Ante(2)},... vroot{rules(selected(i)).Conseq},rules(selected(i)).Conf,rules(selected(i)).Lift) end end %% MapReduce % My MapReduce code is not well suited for this dataset and runs extremely % slow if you use the whole dataset. I will just use just a subset for % demonstration purpose. If you want to try this on the full dataset, you % should write your own MapReduce code optimized for it. % clear everything clearvars; close all; clc % set up source datastore source_ds = datastore('msweb.transactions.csv'); disp(preview(source_ds)) % step 1: Group items by transaction transaction_ds = mapreduce(source_ds, @transactionMapper, @transactionReducer); transactions = readall(transaction_ds); % step 2: Generate 1-itemsets oneItemset_ds = mapreduce(transaction_ds, @oneItemsetMapper, @oneItemsetReducer); oneItemsets = readall(oneItemset_ds);