Unveiling the Optimization Benefit of FireDucks Lazy Execution: Part #3

In the previous article, we have talked about how FireDucks lazy-execution can take care of the caching for the intermediate results in order to avoid recomputation of an expensive operation. In today’s article, we will focus on the efficient data flow optimization by its JIT compiler. We will first try to understand some best practices when performing large-scale data analysis in pandas and then discuss how those can be automatically taken care by FireDucks lazy execution model.

Challenge #1

Let’s consider the following two queries solving the same problem: Find top 2 “A” based on the “B” column.

👉 Can you guess which one is better from performance point of view?

(1) version 1

res = df.sort_values(by="B")["A"].head(2)

(2) version 2

tmp = df[["A", "B"]]
res = tmp.sort_values(by="B")["A"].head(2)

Well, when we conducted this quiz in one of the recent Data Science events, 45% of the participants answered the first one is more efficient, while the remaining 55% answered the second one is more efficient.

Congratulations, if your answer is (2) as well. 👏

In real world situation the target data might have many columns and when we invoked sort operation on df instance, it performed sorting the entire data involving a significant cost in terms of memory and computational power.

As depicted in the following diagram, if the data have columns from ‘a’ to ‘j’, when performing the first query, it also sorts the column ‘c’ to ‘j’ that is not of our interest. Hence, it is a wise call to create a view of the part of data that is of our interest (as shown in the following figure) before performing an computationally intensive operation like sort, groupby, join etc. At this we can save significant amount of runtime memory and computational time. Such optimization is typically known as projection pushdown. projection pushdown example

Challenge #2

Let’s now consider another example:

m = employee.merge(country, on="C_Code")
f = m[m["Gender"] == "Male"]
r = f.groupby("C_Name")["E_Name"].count()

The following diagram illustrates the operations that takes place while executing the query above: country-wise count of male employees

👉 Can you guess the performance bottleneck involved in the above query?

Probably you guessed it correct!!

The query wants to analyze only the male employees. Then why to include all the employees at the very first step while joining the two dataframes employee and country? We could simply filter only the male employees from the employee data and perform the rest of the operations like merge, groupby etc. on the filtered result as shown below. At this we could save significant execution time and memory during the expensive merge operation.

f = employee[employee["Gender"] == "Male"]
m = f.merge(country, on="C_Code")
r = m.groupby("C_Name")["E_Name"].count()

Such optimization is typically known as predicate pushdown. predicate pushdown example

Let’s follow these best practices

When dealing with large-scale data, sometime we might not be interested on all part of the data. Hence, its always the best practice to reduce the scope of your data before applying an expensive operation on it to reduce a significant amount of runtime memory and computational time.

  • When it is known that you are going to perform an operation that involves only some of the columns, it is recommended to project the target columns first to reduce it in the horizontal direction.
  • Again, if your operation targets only some selected rows of the data, it is recommended to filter the target rows before performing the operation to reduce it further in the vertical direction.

For example, let’s consider the below example:

df.sort_values("A")
  .query("B > 1")["E"]
  .head(2)

Let’s consider the data with following color codes, where the expected sorted order is: yellow, red, green, blue.

Also, let’s assume B=1 for darker shade and B=2 for lighter shade. The flow of the above operation will be as follows: sample data flow

As you can see the columns C and D have been used in all the first three steps, but they have never been required in the final result. Also, the sort operation is performed on all the rows of the data, whereas we are only interested in the data of the lighter shades.

Hence, the optimized data flow could be as follows:

df.loc[:, ["A", "B", "E"]]
  .query("B > 1")
  .sort_values("A")["E"]
  .head(2)

It efficiently reduces the data in the horizontal (applying projection pushdown) and vertical (applying predicate pushdown) direction, before applying the expensive sort operation as depicted follows: sample optimized data flow

Case Study

Now let’s understand how such optimization can be useful in real world situations.

The TPC-H is a decision support benchmark that consists of a suite of business-oriented ad-hoc queries and concurrent data modifications. We will use Query-3 as an example in this demonstration that deals with three large tables, namely lineitem, customer, and orders with complex join, groupby, sort etc.

The original query was written in SQL. We can realize the following pandas equaivalent of the same query:

def q3():
    (
        pd.read_parquet("customer.parquet")
          .merge(pd.read_parquet("orders.parquet"), left_on="c_custkey", right_on="o_custkey")
          .merge(pd.read_parquet("lineitem.parquet"), left_on="o_orderkey", right_on="l_orderkey")
          .pipe(lambda df: df[df["c_mktsegment"] == "BUILDING"])
          .pipe(lambda df: df[df["o_orderdate"] < datetime.date(1995, 3, 15)])
          .pipe(lambda df: df[df["l_shipdate"] > datetime.date(1995, 3, 15)])
          .assign(revenue=lambda df: df["l_extendedprice"] * (1 - df["l_discount"]))
          .groupby(["l_orderkey", "o_orderdate", "o_shippriority"], as_index=False)
          .agg({"revenue": "sum"})[["l_orderkey", "revenue", "o_orderdate", "o_shippriority"]]
          .sort_values(["revenue", "o_orderdate"], ascending=[False, True])
          .reset_index(drop=True)            
          .head(10)
          .to_parquet("result.parquet")     
    )

The above implementation doesn’t take care of the “best practices”. It loads the entire data from all the three tables and directly merge them to construct a large table before performing rest of the filter, groupby etc. operations as required for the query.

When we executed the above program in pandas for a scale-factor 10, it took around 203 seconds and the peak memory consumption was around 56 GB. pandas q3 metrics

Let’s now implement the best-practices discussed in the previous section to manually optimize the query as follows:

def optimized_q3():
    # load only required columns from respective tables
    req_customer_cols = ["c_custkey", "c_mktsegment"] # (2/8)
    req_lineitem_cols = ["l_orderkey", "l_shipdate", "l_extendedprice", "l_discount"] #(4/16)
    req_orders_cols = ["o_custkey", "o_orderkey", "o_orderdate", "o_shippriority"] #(4/9)
    customer = pd.read_parquet("customer.parquet", columns = req_customer_cols)
    lineitem = pd.read_parquet("lineitem.parquet", columns = req_lineitem_cols)
    orders = pd.read_parquet("orders.parquet", columns = req_orders_cols)
    
    # advanced-filter: to reduce scope of “customer” table to be processed
    f_cust = customer[customer["c_mktsegment"] == "BUILDING"]

    # advanced-filter: to reduce scope of “orders” table to be processed
    f_ord = orders[orders["o_orderdate"] < datetime.date(1995, 3, 15)]

    # advanced-filter: to reduce scope of “lineitem” table to be processed
    f_litem = lineitem[lineitem["l_shipdate"] > datetime.date(1995, 3, 15)]

    (
        f_cust.merge(f_ord, left_on="c_custkey", right_on="o_custkey")
              .merge(f_litem, left_on="o_orderkey", right_on="l_orderkey")
              .assign(revenue=lambda df: df["l_extendedprice"] * (1 - df["l_discount"]))
              .groupby(["l_orderkey", "o_orderdate", "o_shippriority"], as_index=False)
              .agg({"revenue": "sum"})[["l_orderkey", "revenue", "o_orderdate", "o_shippriority"]]
              .sort_values(["revenue", "o_orderdate"], ascending=[False, True])
              .reset_index(drop=True)
              .head(10)
              .to_parquet("result.parquet")
    )

Instead of loading all the 8 columns from the customer table, all the 16 columns from the lineitem table, and all the 9 columns from the orders table, it loads only the target columns that would be required to implement the query by reducing the data in the horizontal direction (applying projection pushdown).

Also, since we need only a specific rows from these tables based on the given conditions, we performed an early filtration on the loaded data to reduce it further in the vertical direction (applying predicate pushdown).

When we executed the above optimized implementation using pandas for a scale-factor 10, it took around 13 seconds and the peak memory consumption was around 5.5 GB. pandas optimized-q3 metrics

From this experiment, it is quite evident that an optimized implementation of a pandas program can itself improve its performance and memory consumption to a great extent.

👉 Q. Can we automate such optimization such that one can focus more on in-depth data analysis relying on some tool or library for such expert-level optimization?

The answer is “YES”. You can rely on FireDucks for such optimization for sure. 🚀

FireDucks Offerings

While being highly compatible with pandas, FireDucks can perform such expert-level optimization automatically when using its default lazy execution mode.

In order to verify the same, we have executed the methods as follows:

# to use FireDucks for all the processings
import fireducks.pandas as pd

q3()
optimized_q3()

And the execution could be completed within 4-5 seconds for both these cases showing FireDucks strength in performing such optimizations automatically even when the program itself doesn’t take care of it (as in q3).

We have used v2-8 TPU instance from Google Colab for this evaluation and here is the finding in detail:

(pandas, exec_time (s))(pandas, memory (GB))(FireDucks, exec_time (s))(FireDucks, memory (GB))
q3203.18564.243.3
optimized_q312.975.54.813.4

You might like to try this notebook on Google colab to reproduce the same.

Wrapping-up

Thank you for your time in reading this article. We have discussed a couple of best practices that one should follow when performing large-scale data analysis in pandas and how FireDucks can automatically implement the same. The experimental result shows when switching from pandas to FireDucks, it can improve performance of a poorly written program by 48x (203.18 s -> 4.24 s) while reducing the memory consumption by 17x (56 GB -> 3.3 GB).

In case you have any queries or have an issue to report, please feel free to get in touch with us in any of your prefered channel mentioned below: