How we processed data of over 100gb with 16gb of ram
If I told you that it is possible to process data as large as 100gb
in your personal computer with a ram of 16gb
would you believe that? I didn't too but I'll tell you how in this write up.
It makes sense to represent data in a dataframe of rows. You can mainupulate this data in various forms like getting the mean, grouping by a certain column in order to transform or obtain some statistical information on the sub groups. Softwares like mysql and Matlab work this way. However in recent times, pandas has become very popular in the python community one of the reasons is because of the open source community that maintains the code base another reason is because of how optimized it is for vectorized operations( which is encouraging given python is slow compared to c) and how it has readily implemented tools for data anlysis and exploration. Despite all these advantages, it comes with a pitfall. You have to load all your data into memory. This is not a problem if you have data less than 4gb
as many personal computers have ram of about 16gb
these days but what happens if your data is about 100gb
which given the proliferation of data these days is very common?
One of the solutions to this problem is distributed computing you can process this data in multiple computers. Softwares like apache spark and dask do exactly this. I was working in a machine learning toolbox project with data between 40gb
and 100gb
which naturally couldn't fit into any of our computers and we decided to solve this problem with dask. We chose Dask because it ports very well with pandas and you don't necessarily have to change your code base. You can read more on the differences between dask and apache spark here.
The question then is what is dask? Dask is a distributed computing toolware for tabular datasets that can't fit into memory. The upside is that even though it makes sense to use it in a cluser of computers, you can provision your local machine with about 16gb
ram to run datasets of about 100gb
. In the below paragraphs I will outline what we did in order to utilise Dasks after many trial and error and sleepless nights.
One of the main ideas behind distributed computing tools is partitioning of data. If we have datasets as big as 100gb
and we try to fit this into memory of 16gb
it is going to fail. However, if we make 20 equal partitions of this data we'll have about 5gb
for each partition and this can easily fit into a memory of 16gb
. The question arises on how we can compute the mean of a column that resides in different partitions even though the implementation of dask is out of scope of this write up, we try to index our data such that we can know where to find data. Another idea to optimise this process is data storage. The most common file storage is csv but this is not only clunky for our purposes but forces us to import all the data when maybe we wanted to work on a column. This is what columnar file storage systems bring. If you had a tabular data of age, height and income and we want to obtain the mean age with a csv file you are forced to load the whole data but with a columnar storage system like parquet, you load only the relevant column into memory.
Indexing
You can't escape the subject of indexing in Dask just like in relational databases if you want optimal performance you have to index correctly on the columns you use often for your queries. In Dask indexing your dataframe intelligently will save you a lof of shuffles and headache. For example if you know a particular column is used heavily for grouping or for merging with other data frame this should be the column to be indexed. This will make processing your code on a partition level similar to that of pandas.
Partitioning
Using the right partition size helps to limit overhead and amount of memory you spend processing your data frame. According to dask documentation, your partitons should be less than 1gigabyte
Map_partitions
This is a very important subject in refactoring your pandas project to use dask. The first temptation is to rewrite the project to use pandas like dask functions but this didn't work for us. The best option was always a little bit of rewrite of your pandas function and wrapping it around dask map_partitions method. This basically processes your data partition by partition with parallelisation if you provided threads when configuration your workload. This is why you need the right partition size to make sure the data in these partitions fit comfortably in memory. A wrong choice with your partitions will give the dask notice similar to distributed.utils_perf - WARNING - full garbage collections took 11% CPU time recently (threshold: 10%). Imagine we have a pandas function written like the below code snippet
def mapped_fun(data):
for lag in range(4):
data[f"col_{lag}"] = data.groupby("col1")["col3"].transform(lambda x: x.shift(lag)).apply(lambda x: np.log(x))
We want to lag col3
by some lag given by the range function and afterwards log the series. This piece of code is sufficient if the data fits into memory however if we wanted to refactor to dask in the scenario where the data doesn't fit our memory, this piece of code would turn into
def mapped_fun(data):
for lag in range(4):
data[f"col_{lag}"] = data.groupby("col1")["col3"].transform(lambda x: x.shift(lag)).apply(lambda x: np.log(x))
return data
ddf = ddf.map_partitions(mapped_fun)
The only change we need to make to pandas is to return the dataframe. We wrap this function inside dask map_partitions function and that is it.
Group by and merging
These two functions in my opinon are the ones you need to take of the most when working in dask. The strategy is always to make sure you are indexing by at least one of the columns you want to group by and then doing this inside pandas. Basically the logic is that dask will have all your groups inside one partition and can perform these group by task without shuffling the data ie going into other partitions to find the corresponding data to group by. Merging is similar and as already mentioned you need to index your data frame by one of the columns you want to merge on but in the case of merge, you have to do it in the dask and not in pandas. The code below shows an example illustrating all the points mentioned above. I will subsequently explain what the code does, the alternatives and the logic behind what I have done.
x
import pandas as pd
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd
import numpy as np
import gc
def create_files():
size = 10_000_000
for i in range(100):
df = pd.DataFrame({"col1": np.random.randint(90_000, 100_000, size), "col2": np.random.randint(101, 20_000, size), "col3": np.random.uniform(0, 10_000, size)})
# Select appropriate partitions
ddf = dd.from_pandas(df, npartitions=1)
ddf.to_parquet(f"test", ignore_divisions=True, engine="fastparquet", overwrite= i==0, append= i>0)
print("Created first file")
#size = size / 10
for i in range(10):
df = pd.DataFrame({"col1": np.random.randint(90_000, 100_000, size), "col2": np.random.randint(101, 20_000, size), "col4": np.random.uniform(0, 10_000, size)})
# Select appropriate partitions
ddf = dd.from_pandas(df, npartitions=1)
ddf.to_parquet(f"test2", ignore_divisions=True, engine="fastparquet", overwrite= i== 0, append= i>0)
print("Created second file")
print("-------------------------------------------------------------")
def index_reparttion():
print("-------------------------------------------------------------")
print("About to repartition")
ddf = dd.read_parquet("test")
ddf = ddf.repartition(npartitions=1200)
ddf = ddf.set_index("col1")
ddf = _rebalance_ddf(ddf)
print("save parquet")
ddf.to_parquet("test")
ddf = dd.read_parquet("test2")
ddf = ddf.repartition(npartitions=500)
ddf = ddf.set_index("col1")
ddf = _rebalance_ddf(ddf)
print("save parquet 2")
ddf.to_parquet("test2")
print("-------------------------------------------------------------")
# https://stackoverflow.com/questions/52642966/repartition-dask-dataframe-to-get-even-partitions
def _rebalance_ddf(ddf):
"""Repartition dask dataframe to ensure that partitions are roughly equal size.
Assumes `ddf.index` is already sorted.
"""
if not ddf.known_divisions: # e.g. for read_parquet(..., infer_divisions=False)
ddf = ddf.reset_index().set_index(ddf.index.name, sorted=True)
index_counts = ddf.map_partitions(lambda _df: _df.index.value_counts().sort_index()).compute()
index = np.repeat(index_counts.index, index_counts.values)
divisions, _ = dd.io.io.sorted_division_locations(index, npartitions=ddf.npartitions)
return ddf.repartition(divisions=divisions)
def main():
ddf = dd.read_parquet("test")
print(ddf.memory_usage_per_partition(index=True, deep=True).compute())
print(ddf.memory_usage(deep=True).sum().compute())
ddf2 = dd.read_parquet("test2")
print(ddf2.memory_usage_per_partition(index=True, deep=True).compute())
print(ddf2.memory_usage(deep=True).sum().compute())
def mapped_fun(data):
for lag in range(4):
data[f"col_{lag}"] = data.groupby("col1")["col3"].transform(lambda x: x.shift(lag)).apply(lambda x: np.log(x))
return data
# Process the group by transformation in pandas but wrapped with Dask if you use the Dask functions to do this you will
# have a variety of issues.
ddf = ddf.map_partitions(mapped_fun)
print("About to save")
ddf = ddf.merge(ddf2, on=["col1", "col2"], how="left")
ddf.to_parquet("result", engine="fastparquet")
if __name__ == "__main__":
cluster = LocalCluster(
n_workers=4,
threads_per_worker=2,
#processes=False,
memory_limit='auto'
)
client = Client(cluster)
create_files()
index_reparttion()
main()
The first snippet of code I'll comment on will be the last giving credence to the biblical saying that the first shall be the last and the last shall be the first. I joke and digress...
if __name__ == "__main__":
cluster = LocalCluster(n_workers=4,
threads_per_worker=2,
memory_limit='auto')
client = Client(cluster)
main(client)
Here we create a local cluster and we are configuring dask to use 4 workers with 2 threads each for each worker and then we assign a memory limit to each worker. The threads will be used for parallelization of tasks in each partition. For the memory limit, you can optionally pass a specific limit like '8GB'
This means your workers will each have a memory of 8gb
and you'll use a total of 32gb
so if your ram specification is below this limit you'll run out of memory if your workers have a lot of jobs to process. The auto keyword allows dask to use your ram specifications to set the memory limits for the workers. Lastly, this piece of code gives you some dashboard monitoring option that you can view in your browser on http://localhost:8787/status
.
def create_files():
print("-------------------------------------------------------------")
size = 10_000_000
for i in range(100):
df = pd.DataFrame({"col1": np.random.randint(90_000, 100_000, size), "col2": np.random.randint(101, 20_000, size), "col3": np.random.uniform(0, 10_000, size)})
# Select appropriate partitions
ddf = dd.from_pandas(df, npartitions=1)
ddf.to_parquet(f"test", ignore_divisions=True, engine="fastparquet", overwrite= i==0, append= i>0)
print("Created first file")
#size = size / 10
for i in range(10):
df = pd.DataFrame({"col1": np.random.randint(90_000, 100_000, size), "col2": np.random.randint(101, 20_000, size), "col4": np.random.uniform(0, 10_000, size)})
# Select appropriate partitions
ddf = dd.from_pandas(df, npartitions=1)
ddf.to_parquet(f"test2", ignore_divisions=True, engine="fastparquet", overwrite= i== 0, append= i>0)
print("Created second file")
print("-------------------------------------------------------------")
This piece of code creates the files that we are going to process. It is important to understand the underlying data structures in your dataframe as this helps you to know how much memory the whole dataframe consumes, how you can reduce this memory footprint if needed and how many partitions you need to create.
This article tells us about the different pandas data types and in our case we are using float64
for the floats and int64
for the integers. These data types are 8bytes each which gives us 24bytes for each row. This can be reduced to manage memory by executing the below code. We don't use it for this example however.
cast_types = {
'col_0' : np.float16,
'col_1' : np.float16,
'col_2' : np.float16
}
# Reduce memory footprint
ddf = ddf.astype(cast_types)
This gives us a total of 24e7
bytes for each partition in the loop which is 240mb
giving a total of 24gb
for the whole dataframe. The second loop is worth 2.4gb
going by the same logic. This is not a dramatic difference from our 16gb
ram but should illustrate the point without taking hours to execute.
def index_reparttion():
print("-------------------------------------------------------------")
print("About to repartition")
ddf = dd.read_parquet("test")
ddf = ddf.repartition(npartitions=1200)
ddf = ddf.set_index("col1")
ddf = _rebalance_ddf(ddf)
print("save parquet")
ddf.to_parquet("test")
ddf = dd.read_parquet("test2")
ddf = ddf.repartition(npartitions=500)
ddf = ddf.set_index("col1")
ddf = _rebalance_ddf(ddf)
print("save parquet 2")
ddf.to_parquet("test2")
print("-------------------------------------------------------------")
# https://stackoverflow.com/questions/52642966/repartition-dask-dataframe-to-get-even-partitions
def _rebalance_ddf(ddf):
"""Repartition dask dataframe to ensure that partitions are roughly equal size.
Assumes `ddf.index` is already sorted.
"""
if not ddf.known_divisions: # e.g. for read_parquet(..., infer_divisions=False)
ddf = ddf.reset_index().set_index(ddf.index.name, sorted=True)
index_counts = ddf.map_partitions(lambda _df: _df.index.value_counts().sort_index()).compute()
index = np.repeat(index_counts.index, index_counts.values)
divisions, _ = dd.io.io.sorted_division_locations(index, npartitions=ddf.npartitions)
return ddf.repartition(divisions=divisions)
In the second block of code, we repartition the data because the way we have created the partitions each partition is holding 240mb
of data. Our workers which have a memory of 4gb
can handle this but we can do better. We can create 20mb
partitions by repartitioning the data from 100 to 1200. This is way below the memory allocation for our workers and an overkill as a partition from 300 will stll do the trick. We then index the dataframes by col1
. We do this because this column will be used for grouping and merging. If you don't do this the program is most likely not going to work for very large datasets.
Finally, dask for some reason doesn't give equal partitions so you could have situations where one partition is very large and another very small. To fix this problem we use the _rebalance
function. We are getting ready to run our workload.
import pandas as pd
import numpy as np
from memory_profiler import profile
def load_parquet():
df = pd.read_parquet('test/part.0.parquet')
print(f'Memory footprint: {df.memory_usage(deep=True).sum() // 1e6}MB')
df[f"col_{1}"] = df.groupby("col1")["col3"].transform(lambda x: x.shift(1)).apply(lambda x: np.log(x))
df2 = pd.read_parquet('test2/part.0.parquet')
print(f'Memory footprint: {df2.memory_usage(deep=True).sum() // 1e6}MB')
df = df.merge(df2, on=["col1", "col2"], how="left")
return df
if __name__ == '__main__':
df = load_parquet()
print(f'Memory footprint: {df.memory_usage(deep=True).sum() // 1e6}MB')
Line # Mem usage Increment Occurrences Line Contents
=============================================================
4 57.168 MiB 57.168 MiB 1 @profile
5 def load_parq():
6 109.711 MiB 52.543 MiB 1 df = pd.read_parquet('test/part.0.parquet')
7 109.824 MiB 0.113 MiB 1 print(f'Memory footprint: {df.memory_usage(deep=True).sum() // 1e6}MB')
8 170.449 MiB -161439139.406 MiB 1595569 df[f"col_{1}"] = df.groupby("col1")["col3"].transform(lambda x: x.shift(1)).apply(lambda x: np.log(x))
9 65.977 MiB -104.473 MiB 1 df2 = pd.read_parquet('test2/part.0.parquet')
10 66.738 MiB 0.762 MiB 1 print(f'Memory footprint: {df.memory_usage(deep=True).sum() // 1e6}MB')
11 174.879 MiB 108.141 MiB 1 df = df.merge(df2, on=["col1", "col2"], how="left")
12
13 174.879 MiB 0.000 MiB 1 return df
This block of code isn't included in the main code however, it is a very important debugging tool. It helps us see what the optimal partition could be and have an idea how our pandas code is running under the hood of map_partitions. As earlier explained, dask especially with respect to dataframes is basically a wrapper around pandas code this piece of code illustrates that. From the partition we have created we want to load part 0 of both dataframes into memory to see how much memory it takes pandas to load them up. We are also performing a groupby and merge in the data so we replicate this here just to get an idea how much it will cost each worker to process each partition.
We expect each partition to hold about 20mb worth of data however executing this code we discover that the memory cost of the frame is actually more than double at about 52mb. We also notice big increases for groupby and merge operations however these are way smaller compared to our allocated worker memory of 4gb
.
From experience if this takes script takes a long time to execute, you might need to increase your partition size and also make sure that the memory footprint in pandas is less than a 200-300mb
if possible. In the worst case scenario it should be less than a gigabyte if your program is going to run to the end without errors. It is also worthwhile to test a snippet of the pandas commands you'll run in your workload in this script.
def main():
ddf = dd.read_parquet("test")
print(ddf.memory_usage_per_partition(index=True, deep=True).compute())
print(ddf.memory_usage(deep=True).sum().compute())
ddf2 = dd.read_parquet("test2")
print(ddf2.memory_usage_per_partition(index=True, deep=True).compute())
print(ddf2.memory_usage(deep=True).sum().compute())
def mapped_fun(data):
for lag in range(4):
data[f"col_{lag}"] = data.groupby("col1")["col3"].transform(lambda x: x.shift(lag)).apply(lambda x: np.log(x))
return data
# Process the group by transformation in pandas but wrapped with Dask if you use the Dask functions to do this you will
# have a variety of issues.
ddf = ddf.map_partitions(mapped_fun)
print("About to save")
ddf = ddf.merge(ddf2, on=["col1", "col2"], how="left")
ddf.to_parquet("result", engine="fastparquet")
Lastly we run the main function. If you have followed all the above steps that is indexed your dataframe properly, created the right partitions, persisted your data correct and wrapped your pandas around the dask, then your workload will probably run through. It might be slow and overspill to disk but it'll run through eventually.
The lines of code related to memory are basically diagnostic to make sure we have enough worker memory to sufficiently hold this data. It is worth noting that calling compute, processes dask graphs and in some cases brings a lot of data into memory. For example if you add this command print(ddf.compute())
immediately after loading the test data into dask you'll run out of memory.
The rest of the code groups by col1
, transforms and lags by col3
wraps this around map_partitions
and merges ddf
with ddf2
on col1
and col2
this is valid given that col1
is an index. If we on the other hand tried to merge by col3
and col2
for example there is no guarantee this will run on a machine of 16gb
depending of course on how big the data set is.
Finally, we save the data.
Summary
To recap in order to refactor your pandas workload with dask you need to make sure you do the folowing
- Index your data by the column you are going to groupby or merge on.
- Don't leave large graphs that is save the data or call compute after doing some considerable amount of work.
- Partition the data intelligently, run a pandas script on this partition and above all make sure it is less than the worker memory with some free room to boot.
- Use
merge_partitions
instead of using similar pandas functions in dask.
I believe that we can process dataframes up to 500gb
with a ram of 16gb
if you have enough patience and disk space for the spill over to disk. We have not tested this. We have however tested data sets of about 220gb
. Admittedly it did take a long time to process and spilled out to disk alot but it worked out in the end. Happy dasking!!!
Comments
Post a Comment