How we processed data of over 100gb with 16gb of ram

What I learnt using dask

If I told you that it is possible to process data as large as 100gb in your personal computer with a ram of 16gb would you believe that? I didn't too but I'll tell you how in this write up.

It makes sense to represent data in a dataframe of rows. You can mainupulate this data in various forms like getting the mean, grouping by a certain column in order to transform or obtain some statistical information on the sub groups. Softwares like mysql and Matlab work this way. However in recent times, pandas has become very popular in the python community one of the reasons is because of the open source community that maintains the code base another reason is because of how optimized it is for vectorized operations( which is encouraging given python is slow compared to c) and how it has readily implemented tools for data anlysis and exploration. Despite all these advantages, it comes with a pitfall. You have to load all your data into memory. This is not a problem if you have data less than 4gb as many personal computers have ram of about 16gb these days but what happens if your data is about 100gb which given the proliferation of data these days is very common?

One of the solutions to this problem is distributed computing you can process this data in multiple computers. Softwares like apache spark and dask do exactly this. I was working in a machine learning toolbox project with data between 40gb and 100gb which naturally couldn't fit into any of our computers and we decided to solve this problem with dask. We chose Dask because it ports very well with pandas and you don't necessarily have to change your code base. You can read more on the differences between dask and apache spark here.

The question then is what is dask? Dask is a distributed computing toolware for tabular datasets that can't fit into memory. The upside is that even though it makes sense to use it in a cluser of computers, you can provision your local machine with about 16gb ram to run datasets of about 100gb. In the below paragraphs I will outline what we did in order to utilise Dasks after many trial and error and sleepless nights.

One of the main ideas behind distributed computing tools is partitioning of data. If we have datasets as big as 100gb and we try to fit this into memory of 16gb it is going to fail. However, if we make 20 equal partitions of this data we'll have about 5gb for each partition and this can easily fit into a memory of 16gb. The question arises on how we can compute the mean of a column that resides in different partitions even though the implementation of dask is out of scope of this write up, we try to index our data such that we can know where to find data. Another idea to optimise this process is data storage. The most common file storage is csv but this is not only clunky for our purposes but forces us to import all the data when maybe we wanted to work on a column. This is what columnar file storage systems bring. If you had a tabular data of age, height and income and we want to obtain the mean age with a csv file you are forced to load the whole data but with a columnar storage system like parquet, you load only the relevant column into memory.

Indexing

You can't escape the subject of indexing in Dask just like in relational databases if you want optimal performance you have to index correctly on the columns you use often for your queries. In Dask indexing your dataframe intelligently will save you a lof of shuffles and headache. For example if you know a particular column is used heavily for grouping or for merging with other data frame this should be the column to be indexed. This will make processing your code on a partition level similar to that of pandas.

Partitioning

Using the right partition size helps to limit overhead and amount of memory you spend processing your data frame. According to dask documentation, your partitons should be less than 1gigabyte

Map_partitions

This is a very important subject in refactoring your pandas project to use dask. The first temptation is to rewrite the project to use pandas like dask functions but this didn't work for us. The best option was always a little bit of rewrite of your pandas function and wrapping it around dask map_partitions method. This basically processes your data partition by partition with parallelisation if you provided threads when configuration your workload. This is why you need the right partition size to make sure the data in these partitions fit comfortably in memory. A wrong choice with your partitions will give the dask notice similar to distributed.utils_perf - WARNING - full garbage collections took 11% CPU time recently (threshold: 10%). Imagine we have a pandas function written like the below code snippet

We want to lag col3 by some lag given by the range function and afterwards log the series. This piece of code is sufficient if the data fits into memory however if we wanted to refactor to dask in the scenario where the data doesn't fit our memory, this piece of code would turn into

The only change we need to make to pandas is to return the dataframe. We wrap this function inside dask map_partitions function and that is it.

Group by and merging

These two functions in my opinon are the ones you need to take of the most when working in dask. The strategy is always to make sure you are indexing by at least one of the columns you want to group by and then doing this inside pandas. Basically the logic is that dask will have all your groups inside one partition and can perform these group by task without shuffling the data ie going into other partitions to find the corresponding data to group by. Merging is similar and as already mentioned you need to index your data frame by one of the columns you want to merge on but in the case of merge, you have to do it in the dask and not in pandas. The code below shows an example illustrating all the points mentioned above. I will subsequently explain what the code does, the alternatives and the logic behind what I have done.

 

The first snippet of code I'll comment on will be the last giving credence to the biblical saying that the first shall be the last and the last shall be the first. I joke and digress...

Here we create a local cluster and we are configuring dask to use 4 workers with 2 threads each for each worker and then we assign a memory limit to each worker. The threads will be used for parallelization of tasks in each partition. For the memory limit, you can optionally pass a specific limit like '8GB' This means your workers will each have a memory of 8gb and you'll use a total of 32gb so if your ram specification is below this limit you'll run out of memory if your workers have a lot of jobs to process. The auto keyword allows dask to use your ram specifications to set the memory limits for the workers. Lastly, this piece of code gives you some dashboard monitoring option that you can view in your browser on http://localhost:8787/status.

This piece of code creates the files that we are going to process. It is important to understand the underlying data structures in your dataframe as this helps you to know how much memory the whole dataframe consumes, how you can reduce this memory footprint if needed and how many partitions you need to create.

This article tells us about the different pandas data types and in our case we are using float64 for the floats and int64 for the integers. These data types are 8bytes each which gives us 24bytes for each row. This can be reduced to manage memory by executing the below code. We don't use it for this example however.

This gives us a total of 24e7bytes for each partition in the loop which is 240mb giving a total of 24gb for the whole dataframe. The second loop is worth 2.4gb going by the same logic. This is not a dramatic difference from our 16gb ram but should illustrate the point without taking hours to execute.

In the second block of code, we repartition the data because the way we have created the partitions each partition is holding 240mb of data. Our workers which have a memory of 4gb can handle this but we can do better. We can create 20mb partitions by repartitioning the data from 100 to 1200. This is way below the memory allocation for our workers and an overkill as a partition from 300 will stll do the trick. We then index the dataframes by col1. We do this because this column will be used for grouping and merging. If you don't do this the program is most likely not going to work for very large datasets.

Finally, dask for some reason doesn't give equal partitions so you could have situations where one partition is very large and another very small. To fix this problem we use the _rebalance function. We are getting ready to run our workload.

This block of code isn't included in the main code however, it is a very important debugging tool. It helps us see what the optimal partition could be and have an idea how our pandas code is running under the hood of map_partitions. As earlier explained, dask especially with respect to dataframes is basically a wrapper around pandas code this piece of code illustrates that. From the partition we have created we want to load part 0 of both dataframes into memory to see how much memory it takes pandas to load them up. We are also performing a groupby and merge in the data so we replicate this here just to get an idea how much it will cost each worker to process each partition.

We expect each partition to hold about 20mb worth of data however executing this code we discover that the memory cost of the frame is actually more than double at about 52mb. We also notice big increases for groupby and merge operations however these are way smaller compared to our allocated worker memory of 4gb.

From experience if this takes script takes a long time to execute, you might need to increase your partition size and also make sure that the memory footprint in pandas is less than a 200-300mb if possible. In the worst case scenario it should be less than a gigabyte if your program is going to run to the end without errors. It is also worthwhile to test a snippet of the pandas commands you'll run in your workload in this script.

Lastly we run the main function. If you have followed all the above steps that is indexed your dataframe properly, created the right partitions, persisted your data correct and wrapped your pandas around the dask, then your workload will probably run through. It might be slow and overspill to disk but it'll run through eventually.

The lines of code related to memory are basically diagnostic to make sure we have enough worker memory to sufficiently hold this data. It is worth noting that calling compute, processes dask graphs and in some cases brings a lot of data into memory. For example if you add this command print(ddf.compute()) immediately after loading the test data into dask you'll run out of memory.

The rest of the code groups by col1, transforms and lags by col3 wraps this around map_partitions and merges ddf with ddf2 on col1 and col2 this is valid given that col1 is an index. If we on the other hand tried to merge by col3 and col2 for example there is no guarantee this will run on a machine of 16gb depending of course on how big the data set is.

Finally, we save the data.

Summary

To recap in order to refactor your pandas workload with dask you need to make sure you do the folowing

  1. Index your data by the column you are going to groupby or merge on.
  2. Don't leave large graphs that is save the data or call compute after doing some considerable amount of work.
  3. Partition the data intelligently, run a pandas script on this partition and above all make sure it is less than the worker memory with some free room to boot.
  4. Use merge_partitions instead of using similar pandas functions in dask.

I believe that we can process dataframes up to 500gb with a ram of 16gb if you have enough patience and disk space for the spill over to disk. We have not tested this. We have however tested data sets of about 220gb. Admittedly it did take a long time to process and spilled out to disk alot but it worked out in the end. Happy dasking!!!

 

Comments

Popular posts from this blog

AWS networking basics with terraform and a tiny bit of microservices

Python meets linear algebra