Working efficiently with data: Parquet files and the Needle in a Haystack problem
This is a guest post by Onomitra Ghosh who is the Product Manager for MATLAB Data Analysis and Big Data workflows.
How big does data need to be before it can be called big? While there is no definite number beyond which data can be called big data, it's safe to say that when it becomes uncomfortably large for the memory of your computer, it is getting big. One way to tackle big data is to throw a lot of processing power at it (either locally or in cloud). But quite often, when working with big data, we do not always need all of it for our analysis. So, a smarter first step is to read only the data that is needed. This is especially true for the "needle in a haystack" problem where we need to find a small slice of information in an ocean of data. In this blog, we will show how using Parquet files for storing big data and performing read-time filtering can make the subsequent analysis more efficient than storing and reading them from conventional file formats like csv, spreadsheets etc.


A quick introduction to Parquet
Before we get too far ahead, let's do a quick introduction to Parquet file format. Parquet is in open-source column-oriented data storage format developed and maintained as a part of the Apache Software Foundation. Usage of Parquet files for big data has been steadily growing since its inception because they are very efficient to store and read data.
At the lowest level, a Parquet file stores data in a columnar format. Unlike more traditional row-based storages, Parquet files store data of each column together. As a result, if we want to read specific columns, we can read a contiguous set of data instead of spending time skipping and seeking or loading the whole file and then selecting specific columns in memory. Selecting and reading specific columns is also called projection pushdown.

Along with reading specific columns, often we also only need a subset of rows for our analysis. To support this, Parquet allows writing the data into row groups. While within each row group data is laid out in a columnar format, the row groups themselves separate out the slices of data that can be useful for filtering based on specific row values.

In addition, each Parquet file maintains a set of metadata about the file, each row group, and each column. This may contain information about data type, codec, number of values, read offset etc. These metadata help to quickly seek the right set of rows to read for a specific filtering condition. This is also called predicate pushdown. These pushdown capabilities help to filter the data while reading from the file instead of reading the entire dataset into memory and then filtering it. Last but not the least, Parquet also allows efficient compression techniques. Together these capabilities help in storing large amount of data with a relatively smaller footprint and read only the values that are needed. 
In this blog, we are going to focus on how writing out a flat tabular data into Parquet files using row groups can help in faster data analysis. To keep things simple, we will not be exploring any kinds of parallelization (that's for a future blog). Parallelization using Parallel Computing Toolbox can make the code run even faster. But today, we will measure and highlight the performance impact of simply using Parquet files instead of csv while analyzing data with just your desktop machine resources.
Dataset
The data we are going to use is a modified subset of the Flight Data from Dashlink. We are going to work with the flight data for Tail 660. The dataset contains information of 4578 unique flights flown by this plane between 2001 to 2010. Each flight data is stored in a separate CSV file. Individually each of these files are not very big. But together there are about 22 million rows of flight information contained in these 4578 files. Also, each file has different flight sensor information stored in its 87 columns. The entire dataset is about 15GB in memory. While, this is not terabytes or petabytes of data, it is still big enough that can make analyzing it in a standard computer uncomfortable and frustrating. 

Problem statement
For our analysis problem, we want to find if any of these 4578 flights flew over Massachusetts. Two of the flight sensors record the latitude and longitude information. We can use this information to track the flight path for each of these flights. Let's read a sample file and plot the flight path.
exampleCSVFile = "data\csv\660200110191613_1HZ.csv";
tcsv = readtable(exampleCSVFile);
head(tcsv)
geoplot(tcsv.LATP,tcsv.LONP,'Color','b','LineWidth',3);
geobasemap colorterrain
Looks like this particular flight flew from Norfolk, VA to Minneapolis, MN. To find flights that flew over Massachusetts, we have to compare each flight's path (from each of those 4578 files!) against Massachusetts' coordinates. We will assume the following as a rough estimate of Massachusetts latitude and longitude.
MALatMin = 42.03;
MALatMax = 42.72;
MALonMin = -73.37;
MALonMax = -70.06;
MACoordinates = [MALatMin, MALatMax, MALonMin, MALonMax];
By the way, this a true "Needle in a haystack" problem. There are only 301 out of 22 million flight records (0.001%) spread over these 4578 files that has Massachusetts latitude and longitudes in this dataset. Our exercise will be to find these 301 rows as efficiently as possible.
CSV vs. Parquet: Single file
Let's start by understanding the differences between csv and Parquet a little bit. We have already looked at how to read the csv file into a MATLAB table. Now let's try the same with Parquet. First, let's write out this flight information in a Parquet file. The simplest way to write to a Parquet file is by calling the parquetwrite function. We can then check the information of the Parquet file using parquetinfo.
exampleParquetFile = "exampleparquet.parquet";
parquetwrite(exampleParquetFile, tcsv); 
parquetinfo(exampleParquetFile)
Let's read the file back. We can use parquetread function to read data from Parquet files into a MATLAB table.
tparquet = parquetread(exampleParquetFile);
head(tparquet)
The CSV file is over 8Mb!
s = dir(exampleCSVFile); 
disp("CSV file size: " + s.bytes/1024/1024 + "MB")
While the Parquet file is much smaller at only 0.67Mb.
s = dir(exampleParquetFile); 
disp("Parquet file size: " + s.bytes/1024/1024 + "MB")
We can immediately observe one of the benefits of using Parquet. Just simply writing this dataset as a Parquet file helps in reducing the file size on disk by approximately 12x compared to CSV. 
tstart = tic;readtable(exampleCSVFile);tend = toc(tstart)
tstart = tic;parquetread(exampleParquetFile);tend = toc(tstart)
Even the read time is 6x faster when reading from Parquet vs. csv. Note, that the exact speed up and file size will depend on many factors including the kind of data contained in the files, how many row groups are formed etc.
Datastore, Tall and Rowfilter
While reading data from a single Parquet file was faster, the real benefit of using Parquet files is realized when the dataset is big. But we cannot use readtable and parquetread when working with data spread over multiple files like this one. Instead, we will use the following functionalities in MATLAB that can help in working with big and distributed data.
1. Datastores are a way to read big and distributed data in MATLAB. A datastore does not directly read the data. Instead, it holds information about which files to read and how to read them when requested. This is very helpful in iteratively reading a large collection of files instead of loading all of it at once. MATLAB has datastores for various file formats, including text, spreadsheet, image, Parquet, and even custom format files. For a full list of types of datastores, see documentation on available datastores. In this exercise we will use tabularTextDatastore for processing the csv files and parquetDatastore for the parquet files.
2. Tall is a lazy evaluation framework in MATLAB which is created based on a datastore. The advantage of using tall is that we can take the same code that runs on regular MATLAB tables and run on a tall table backed by the datastore. We don't need to learn a whole lot of new syntaxes and functions to work with bigger data. But, because of its lazy evaluation, tall code is not executed immediately. Instead, when the gather function is called, tall reads the data from the underlying datastore and runs the operations as they are read. Tall supports hundreds of data analysis and machine learning functions that work with MATLAB tables and timetables. Tall tables can also be indexed like regular tables to find the relevant rows and columns.
3. Rowfilter is a relatively new concept in MATLAB introduced in R2022a. A rowfilter is a MATLAB object that helps to specify which rows to import. For Parquet files, rowfilter takes advantage of predicate pushdown and filters the rows at the file level using Parquet's rowgroup information. Rowfilters can be specified in parquetread and parquetDatastore. But the nice thing about using tall is that we we don't need to specify rowfilter explicitly. If we index tall tables to find relevant rows and columns (like a regular MATLAB table), it automatically uses rowfilter to perform read-time filtering on Parquet data without us having to write any additional code.
Data reading strategies
For this exercise, we will consider 3 different data reading strategies. For each of them, we will first create a datastore (ds) and then a tall table (tds) on the datastore. We will then use tds to index and read the data (shown in the following code examples).
Read strategy 1: First read the entire data into memory and then find the relevant flights over MA (Of course this will be slow and ill-advised, but just for comparison's sake)
allData = gather(tds);
flightsOverMA = allData(allData.LATP>coordinates(1) & allData.LATP<coordinates(2) & allData.LONP>coordinates(3) & allData.LONP<coordinates(4),["TripNo","LATP","LONP"]);
Read strategy 2: Filter the data for the matching latitudes and longitudes at read-time and read all columns for the filtered rows. Then choose the relevant columns.
flightsOverMAtall = tds(tds.LATP>coordinates(1) & tds.LATP<coordinates(2) & tds.LONP>coordinates(3) & tds.LONP<coordinates(4),:); 
flightsOverMAAllColumns = gather(flightsOverMAtall);
flightsOverMA = flightsOverMAAllColumns(:,["TripNo","LATP","LONP"])
Read strategy 3: Read only relevant columns for matching latitudes and longitudes (all filtering happens at read-time)
flightsOverMAtall = tds(tds.LATP>coordinates(1) & tds.LATP<coordinates(2) & tds.LONP>coordinates(3) & tds.LONP<coordinates(4),["TripNo","LATP","LONP"]); 
flightsOverMA = gather(flightsOverMAtall);
Since we have to do the same reads twice (csv and Parquet), the above read codes are converted to convenient helper functions at the end of this blog.
Also note that when tall tables are evaluated (using gather), it displays the time taken by the evaluation passes. However, the actual data analysis time for each of the above strategies are slightly more than just the gather command. They are captured using tic/toc around the complete read and filtering code. 
CSV
First let's trying solving our problem by using csv files. To read data from the dataset, we will use Tall on tabularTextDatastore. Note that neither of these objects actually reads the entire dataset. Datastore just stores information on how to read the files and the tall table reads the first few lines to show us what the data looks like.
ds = tabularTextDatastore("data\csv\*.csv")
tds = tall(ds)
Let's try our 3 read strategies next.
CSV Read Strategy 1 - Read the entire data into memory
[flightsOverMA, csvTime1] = findFlightsAllData(tds, MACoordinates);
head(flightsOverMA)
height(flightsOverMA)
csvTime1
CSV Read Strategy 2 - Read all columns for matching latitudes and longitudes
[flightsOverMA, csvTime2] = findFlightsFilteredRowsAllColumns(tds, MACoordinates);
height(flightsOverMA)
csvTime2
CSV Read Strategy 3 - Read only relevant columns for matching latitudes and longitudes
[flightsOverMA, csvTime3] = findFlightsFilteredRowsAndColumns(tds,MACoordinates);
height(flightsOverMA)
csvTime3
We were able to find the 301 rows with Massachusetts coordinates with each reading strategy but it can take up to 10 min to read the csv dataset and perform this read. 
Parquet
Analyzing this data directly from csv files was painstakingly slow. Let's explore how using Parquet files instead can make a significant difference in our analysis time. First, we will rewrite the data into Parquet files. There are different ways to write Parquet files in MATLAB. We can use the writeall method of datastore to write all the data that are being read into new file types. However, this will write a Parquet file for each individual csv file. Parquet files of such small sizes are not going to be very helpful. Instead we would like to coalesce the flight information into larger Parquet files that can still be read efficiently into MATLAB. This can be done using the write method for tall and using parquetwrite as a custom write function. 
write("data/parquet/", tds, "WriteFcn", @dataWriter);
function dataWriter(info, data )
    filename = info.SuggestedFilename;
    parquetwrite(filename, data)
end
d = dir("data/parquet/");
struct2table(d)
When written this way, tall coalesced the entire dataset into 12 parquet files with a single rowgroup in each file. Let's now run our tests on the parquet files. 
Just like we used tabularTextDatastore to read the csv files, we can use parquetDatastore to read it from the Parquet files and then create the tall table.
ds = parquetDatastore("data\parquet\*.parquet")
tds = tall(ds)
Now let's try the same read strategies on Parquet files.
Parquet Read Strategy 1 - Read the entire data into memory
[flightsOverMA, parquetTime1] = findFlightsAllData(tds, MACoordinates);
head(flightsOverMA)
height(flightsOverMA)
parquetTime1
Parquet Read Strategy 2 - Read all columns for matching latitudes and longitudes
This where parquet's predicate pushdown comes into action. When we index into the tall array using the latitude and longitude values, tall uses that information to create the corresponding rowfilter and then reads from the parquet files. 
[flightsOverMA, parquetTime2] = findFlightsFilteredRowsAllColumns(tds, MACoordinates);
height(flightsOverMA)
parquetTime2
Parquet Read Strategy 3 - Read only relevant columns for matching latitudes and longitudes
Now, in addition to predicate pushdown, let's just read the relevant columns.
[flightsOverMA, parquetTime3] = findFlightsFilteredRowsAndColumns(tds,MACoordinates);
height(flightsOverMA)
parquetTime3
Results
Like the csv reads, we were able to find the 301 rows with Massachusetts coordinates from the Parquet files, but the results were much faster. Let's review what we have achieved here by working with Parquet files instead of csv. 
plotResults([csvTime1, parquetTime1; csvTime2, parquetTime2; csvTime3, parquetTime3])
- Read Strategy 1: This was the heavy hammer scenario where we read the entire data in memory. By simply rewriting the files in Parquet gave us about 12x faster performance.
- Read Strategy 2: A better way to work with this data is to only read the rows that are needed. We used the same indexing code for our tall table on Parquet as we did for csv. However, when working with Parquet files, tall used the indexing information for predicate pushdown to do read-time filtering. As a result, reading the parquet files was 28x faster than reading csv. It was also almost 2.5x faster than reading the whole Parquet dataset.
- Read Strategy 3: This is where we have completely used Parquet's predicate and projection pushdown capabilities to read data efficiently. In addition to the row filtering in Read Strategy 2, we also read only the relevant columns. Because Parquet files are column oriented, the reader only needed to read a contiguous block of memory instead of finding latitudes and longitudes spread all over the file. As a result, we were able to read the data in under 2 seconds; a speed up of more than two orders of magnitude when compared to doing the same using csv.
Let's also check the actual flight path. It turns out that only one flight (out of 4578) flew over Massachusetts. It just grazed by its western border with New York state. We will find extract the complete flight information from the tall table and then plot the flght path.
flights = unique(flightsOverMA.TripNo)
completeFlightPath = gather(tds(tds.TripNo == flights,["LATP","LONP"])); 
geoplot(completeFlightPath.LATP,completeFlightPath.LONP,'Color','b','LineWidth',3);
geobasemap colorterrain
geolimits([34.3 50.8],[-97.6 -69.0])
Summary
This post covered a quick understanding of how Parquet can help when working with big data. We saw how using Parquet files, we were able to achieve more than 100x performance improvement without needing additional processing power or parallelization. This kind of data engineering and analysis technique is a good first step when working with big data before reaching out for more power. However, here are a few things to keep in mind before we wrap up. 
1. Parquet files store data in the form of a table. This can be good for most cases. But if the data is not tabular to begin with (e.g. multi-dimensional data), it may not be suitable for Parquet files. 
2. One disadvantage of using Parquet files over csv is that it cannot be opened with applications like Excel. This reduces readability and ease of use. 
3. Parquet files do not provide the same level of efficiency for all data sizes. In fact, for smaller data sizes, there might not be much benefit at all. Also, to keep this blog introductory, our example showed a tabular data written into Parquet files using default row group sizing. We did not dig deeper into the impact of using different rowgroup sizes. We also did not talk about how parallelization can help in making the process faster. These choices do impact the level of performance that can be achieved by using Parquet files. Let us know if you are interested in any of these topics (or other data related topics) and we can cover them in future blogs.
4. Lastly, Parquet is not the only way to work with big data. While its a promising file format, there can be good reasons to store and work with big data in other formats (csv, spreadsheet, mat etc.) and sources (cloud, databases, data platforms etc.). For more information on how MATLAB can work with different kinds of big data, please visit: https://www.mathworks.com/solutions/big-data-matlab.html 
Helper functions
function dataWriterDefault(info, data ) 
    filename = info.SuggestedFilename;
    parquetwrite(filename, data)
end
function [flightsOverMA, t] = findFlightsAllData(tds, coordinates)
    tstart = tic;
    allData = gather(tds);
    flightsOverMA = allData(allData.LATP>coordinates(1) & allData.LATP<coordinates(2) & allData.LONP>coordinates(3) & allData.LONP<coordinates(4),["TripNo","LATP","LONP"]);
    t = toc(tstart);
end
function [flightsOverMA, t] = findFlightsFilteredRowsAllColumns(tds, coordinates)
    tstart = tic;
    flightsOverMAtall = tds(tds.LATP>coordinates(1) & tds.LATP<coordinates(2) & tds.LONP>coordinates(3) & tds.LONP<coordinates(4),:); 
    flightsOverMAAllColumns = gather(flightsOverMAtall);
    flightsOverMA = flightsOverMAAllColumns(:,["TripNo","LATP","LONP"]);
    t = toc(tstart);
end
function [flightsOverMA, t] = findFlightsFilteredRowsAndColumns(tds, coordinates)
    tstart = tic;
    flightsOverMAtall = tds(tds.LATP>coordinates(1) & tds.LATP<coordinates(2) & tds.LONP>coordinates(3) & tds.LONP<coordinates(4),["TripNo","LATP","LONP"]); 
    flightsOverMA = gather(flightsOverMAtall);
    t = toc(tstart);
end
function plotResults(data)
    b = bar(data);
    ylabel("seconds")
    xticklabels(["Read strategy 1","Read strategy 2","Read strategy 3"])
    xtips1 = b(1).XEndPoints;
    ytips1 = b(1).YEndPoints;
    labels1 = string(round(b(1).YData,1))+"s";
    text(xtips1,ytips1,labels1,'HorizontalAlignment','center',...
        'VerticalAlignment','bottom')
    xtips2 = b(2).XEndPoints;
    ytips2 = b(2).YEndPoints;
    labels2 = string(round(b(2).YData,1))+"s";
    text(xtips2,ytips2,labels2,'HorizontalAlignment','center',...
        'VerticalAlignment','bottom')
    legend(["csv","parquet"])
    ylim([0, data(1,1)*1.1])
end





 
                
               
               
               
               
               
              
Comments
To leave a comment, please click here to sign in to your MathWorks Account or create a new one.