Unveiling the Optimization Benefit of FireDucks Lazy Execution: Part #1
The availability of runtime memory is often a challenge faced at processing larger-than-memory-dataset while working with pandas. To solve the problem, one can either shift to a system with larger memory capacity or consider switching to alternative libraries supporting distributed data processing like (Dask, PySpark etc.).
Well, do you know when working with data stored in columnar formats like csv, parquet etc. and only some part of data is to be processed, manual optimization is possible even in pandas? For example, let’s consider the below data is stored in a parquet file, named sample_data.parquet (or in a csv file, named sample_data.csv):
a b c x y z
0 1 0.1 1 0 t1 10
1 2 0.2 4 1 t2 20
2 3 0.3 9 1 t3 30
3 4 0.4 16 0 t1 40
4 5 0.5 25 1 t2 50
5 6 0.6 36 1 t1 60
6 7 0.7 49 0 t2 70
7 8 0.8 64 1 t3 80
And you want to perform sum of “c” column, when the value of “x” column is 1. You may simply write the program as follows:
df = pd.read_parquet("sample_data.parquet")
res = df[df["x"] == 1]["c"].sum() # filter data based on condition and calculate sum of "c" column from filtered frame
print (res)
Now the problem may occur when the parquet file is too large to fit in your system memory, although you are interested only a part of it (column “x” and “c”).
Thankfully, read_parquet() method has a parameter named columns
and you can specify the target columns to be loaded from the input parquet file:
df = pd.read_parquet("sample_data.parquet", columns =["x", "c"])
res = df[df["x"] == 1]["c"].sum() # filter data based on condition and calculate sum of "c" column from filtered frame
print (res)
Similarly, read_csv() has a parameter, named usecols
that can be specified when loading only target columns from a CSV file.
FireDucks Offerings
Although such parameters can be specified to optimize runtime memory consumption when using pandas, it might be difficult to know what all columns are required at the very begining of analysing the data. An automatic optimization for such cases would definitely be useful for users of pandas-like libraries.
Since FireDucks 1.1.1, we have supported such optimization to be taken care of by its internal JIT compiler. Even though such parameters are not manually specified, the JIT compiler can inspect the projection targets on various stages for a given data and it can automatically specify such parameters when generating the optimized code. Such optimization is commonly known as pushdown-projection. By specifiying the environment variable FIRE_LOG_LEVEL=3, you can inspect the before and after optimization for the below example.
$ cat read_parquet_opt_demo.py
import pandas as pd
df = pd.read_parquet("sample_data.parquet")
r1 = df[df["x"] == 1]["c"].sum()
print(r1)
Execute the program as follows:
$ FIRE_LOG_LEVEL=3 python -mfireducks.pandas read_parquet_opt_demo.py
It will then show the intermediate representation (IR) generated for the above program before execution as follows:
2024-12-04 13:12:40.618398: 543780 fireducks/lib/fireducks_core.cc:64] Input IR:
func @main() {
%t0 = read_parquet('sample_data.parquet', []) <- load the input parquet file
%t1 = project(%t0, 'x') <- project "x" column from loaded data (df["x"])
%t2 = eq.vector.scalar(%t1, 1) <- generate mask with equality check with scalar value, 1 (mask = df["x"] == 1)
%t3 = filter(%t0, %t2) <- perform filter with computed mask (fdf = df[mask])
%t4 = project(%t3, 'c') <- project "c" column from filtered data (fdf["c"])
%v5 = aggregate_column.scalar(%t4, 'sum') <- calculate sum of projected column (fdf["c"].sum())
return(%t4, %v5)
}
And the Optimized IR (target for execution) is as follows. You can see that it is mostly the same with the optimization added in the instruction for read_parquet() by automatically specifying the target columns to be loaded for the computation of this specific result (r1).
2024-12-04 13:12:40.619360: 543780 fireducks/lib/fireducks_core.cc:73] Optimized IR:
func @main() {
%t0 = read_parquet('sample_data.parquet', ['c', 'x'])
%t1 = project(%t0, 'x')
%t2 = eq.vector.scalar(%t1, 1)
%t3 = project(%t0, ['c'])
%t4 = filter(%t3, %t2)
%t5 = project(%t4, 'c')
%v6 = aggregate_column.scalar(%t5, 'sum')
return(%t5, %v6)
}
The python equivalent of the above optimized IR (that will be executed by the FireDucks multi-threaded kernel) is as follows:
df = pd.read_parquet("sample_data.parquet", columns=["c", "x"]) # load only required column for analysis
t1 = df["x"] # projection of target column for equality check
t2 = (t1 == 1)
t3 = df["c"] # projection of only target column to be filtered
t4 = t3[t2]
t5 = t4["c"]
v6 = t5.sum()
⚠️ Please note that the verification through this environment variable setting is mainly for the developers and we might change the way of representing the IRs in future. As a user, it would be good to inspect the optimization using this variable at this moment though.
Let’s put it into a test drive
You can refer to the notebook. It demonstrates the performance benefit of such optimization on a real dataset. You may like to experiment around the query to realize the efficiency of FireDucks optimization. For a sample query, FireDucks performed 45x faster than Pandas, that too without any modification in the source program and affecting the result.
It also explains some Do’s and Don’ts when executing a query in notebook-like platform. In case of notebook, the execution takes place cell-by-cell. Thus when keeping the intermediate results in some cell variables, FireDucks compiler assumes that those might be used at some later stage. So it will keep all of them alive hindering the optimization. Therefore, it is highly recommended to write a query in chained expression when using notebook.
Wrapping-up
Thank you for your time in reading this article. In case you have any queries or have an issue to report, please feel free to get in touch with us in any of your prefered channel mentioned below: