Pythonの高速データフレームライブラリFireDucksを使ってみた

pandasは,プログラミング言語Pythonにおいて,データ解析を支援する機能を提供するライブラリである. NECの研究所では高速化版pandasであるFireDucksというライブラリを開発している.

データの準備

ニューヨークのタクシーの乗降者履歴のデータを対象に分析を行う. データの出典は以下である:

https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page

大規模データの解析を行うため,上記リンクから「Yellow Taxi Trip Records」を2022年1月から2023年6月までのデータをダウンロードし結合した. データはparquet形式で提供されているが,日頃良く使うcsv形式で試すために変換しておく. 参考までにデータ準備用のスクリプトを付記しておく.

import pandas as pd
import os

dir = "xxx"
df_list = []
for year in [2022, 2023]:
for i in range(12):
month = str(i+1).zfill(2)
fn = f"yellow_tripdata_{year}-{month}.parquet"
file = os.path.join(dir, fn)
if not os.path.exists(file):
continue
df = pd.read_parquet(fn)
df_list.append(df)

all_df = pd.concat(df_list)
all_df.to_csv("taxi_all.csv")

データの中身は以下のような値が入っている(列は一部抜粋).

列名データ型説明
passenger_countint乗車人数
pu_location_Idstringタクシーメーターが作動し始めたTLCタクシーゾーン.
do_location_Idstringタクシーメーターが解除されたTLCタクシーゾーン.
tpep_dropoff_datetimestringメーターが解除された日時.
tpep_pickupdate_timestringメーターが作動し始めた日時.
trip_distancedoubleタクシーメーターによって報告された走行距離(マイル単位).
total_amountdouble乗客に請求される合計金額. ※現金のチップは含まれない.
extradoubleその他の割増料金と追加料金.現在,これには0.50ドルおよび1ドルのラッシュアワー料金と夜間料金のみが含まれる.
fare_amountdoubleメーターによって計算された時間距離併用運賃.

実際に行う前処理

用意したデータに対して,データ分析でよく利用される型変換,列追加,異常値削除などの一連の前処理計算を行う.

まず速度計測用のラッパーを用意しておく. _evaluate()については後述する.

from time import time
from functools import wraps

def timer(func):
@wraps(func)
def wp(*args, **kargs):
t = time()
ret = func(*args, **kargs)
print(f"{func.__name__} : {(time() - t):.5g} [sec]")
return ret
return wp

def evaluate(df):
if hasattr(df, "_evaluate"):
df._evaluate()

データの読み込み

まずはデータを読み込む. pandasをimportしてからread_csvで読み込む. 関数だけ定義しておいて,後で纏めて呼び出して計測する.

import pandas as pd
@timer
def file_read(fn, args={}):
df = pd.read_csv(fn, **args)
evaluate(df)
print(df.shape)
return df

データ処理

欠損値があるデータを削除する.

@timer
def drop_na(df):
df.dropna(how="all", inplace=True)
evaluate(df)
return df

乗車,降車の年月日時はstring型で読み込んだので,日付型に変換しておく.

@timer
def txt_to_date(df, low):
df[low] = pd.to_datetime(df[low])
evaluate(df)
return df

乗車数でgroupbyして分布を見てみる(printは性能評価に含めないので省略). 乗車数0人のデータがあることが分かるので,削除する.

# 一人以上乗車している
@timer
def check_passenger_c(df):
df_ = df.groupby("passenger_count").size()
evaluate(df_)
df = df[df["passenger_count"] > 0]
evaluate(df)
return df

乗車日時の日付データから,年,月,日,時の情報を取り出し,列を追加する. 乗車年や月の分布を見てみると不適切な値が含まれているので,削除する.

# 乗車年/月が正しい
@timer
def check_pu_date(df):
df['year'] =  df['tpep_pickup_datetime'].dt.year
df['month'] = df['tpep_pickup_datetime'].dt.month
df['date'] =  df['tpep_pickup_datetime'].dt.day
df['hour'] =  df['tpep_pickup_datetime'].dt.hour

df_ = df.groupby("year").size()
evaluate(df_)
df = df[(df['year'] == 2022) | (df['year'] == 2023)]

df_ = df.groupby("month").size()
evaluate(df_)
df = df[(df['month'] >= 1) & df['month'] <= 12]

df_ = df.groupby(["year", "month"]).size()
evaluate(df_)
evaluate(df)
return df

降車時間と乗車時間の差を分に変換して列を追加する. 非正の乗車時間や長すぎる乗車時間を削除する.

# 現実的な乗車時間(分)
@timer
def check_ride_time(df):
df["ride_time"] = (df["tpep_dropoff_datetime"] - df["tpep_pickup_datetime"]).dt.seconds / 60
df = df[(df["ride_time"] > 0) & (df["ride_time"] <= 180)]
evaluate(df)
return df

乗車距離や運賃についても非負や大きすぎる値を削除する.

# 現実的な距離
@timer
def check_trip_distance(df):
df = df[(df["trip_distance"] > 0) & (df["trip_distance"] <= 250)]
evaluate(df)
return df

# 現実的な運賃
@timer
def check_total_amount(df):
df = df[(df["total_amount"] > 0) & (df["total_amount"] <= 1000)]
evaluate(df)
return df

乗降車地点のIDから緯度経度を算出する. IDと地点の関係は以下で確認することができる.

https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv

緯度経度は,

https://d37ci6vzurychx.cloudfront.net/misc/taxi_zones.zip

をもとに作成した変換テーブルをmergeすることで列を追加する.

緯度経度の情報からニューヨーク市外のデータを削除する.

# IDから緯度経度を割り出す
@timer
def add_coordinate(df, ID_df):
df = df.merge(ID_df.rename(columns={"longitude": "start_lon", "latitude": "start_lat"}),
left_on="PULocationID", right_on="LocationID", how="left").drop("LocationID", axis=1)
df = df.merge(ID_df.rename(columns={"longitude": "end_lon", "latitude": "end_lat"}),
left_on="DOLocationID", right_on="LocationID", how="left").drop("LocationID", axis=1)
evaluate(df)
return df

# NY内かをcheckする
@timer
def in_NY(df):
df = df[(df["start_lon"] <= -71.47) & (df["start_lon"] >= -79.45)]
df = df[(df["start_lat"] >= 40.29) & (df["start_lat"] <= 45)]
df = df[(df["end_lon"] <= -71.47) & (df["end_lon"] >= -79.45)]
df = df[(df["end_lat"] >= 40.29) & (df["end_lat"] <= 45)]
evaluate(df)
return df

def check_in_NY(df, ID_df):
df = add_coordinate(df, ID_df)
df = in_NY(df)
return df

以上のようにデータを読み込み,型変換,列追加,異常値削除をする一連の処理を用意した.

  1. ファイルの読み込み
  2. 日付データを文字列から日付型に変換
  3. 前処理
  4. 欠損値を削除
  5. 乗客数のチェック
  6. groupbyで分布を確認
  7. 1人以上を選択
  8. 乗車時刻をチェック
  9. 日付データから年月日時を取得し,列を追加
  10. 年でgroupbyして,確認 → 該当年のみを選択
  11. 月でgroupbyして,確認 → 1~12月のみを選択
  12. 年,月でgroupbyして分布を確認
  13. 乗車時間をcheck
  14. 降車時刻と乗車時刻の差をとり,分に変換して列を追加(dt.total_secondがFireDucks未対応)
  15. 現実的な乗車時間のデータを選択する
  16. 乗車距離をチェック
  17. 現実的な乗車距離のデータを選択する
  18. 料金をチェック
  19. 現実的な料金のデータを選択する
  20. NY市内のデータを選択
  21. 乗降車のIDと,緯度経度のテーブルをマージする
  22. 乗降車の緯度経度がNY市内となっているデータを選択

pandasでの実行時間

まずはpandasでの実行時間を確認してみる. 測定には24コアXeonサーバー(Intel(R) Xeon(R) Gold 6226 CPU x 2,メインメモリ256GB)を用いた.

import os
data_path = "data_sets"
fn = os.path.join(data_path, "taxi_all.csv")
ID_file = os.path.join(data_path, "ID_to_coordinate.csv")

need_cols = ['tpep_pickup_datetime','tpep_dropoff_datetime', 'passenger_count', 'trip_distance',
'PULocationID', 'DOLocationID', 'total_amount', 'improvement_surcharge', 'extra', 'fare_amount', 'RatecodeID']

@timer
def Preprocessing():
df = file_read(fn, {"usecols": need_cols})
ID_df = file_read(ID_file)

df = txt_to_date(df, "tpep_pickup_datetime")
df = txt_to_date(df, "tpep_dropoff_datetime")

df = drop_na(df)
df = check_passenger_c(df)
df = check_pu_date(df)
df = check_ride_time(df)
df = check_trip_distance(df)
df = check_total_amount(df)
df = check_in_NY(df, ID_df)
evaluate(df)
return df

df = Preprocessing()

さて,pandasでの実行時間は下表のような結果になった. file_readが2件あるのはtaxiのデータの他に,マージ用の地点データも読み込んだためであり,txt_to_dateは乗車時間と降車時間の2件を変換したためである. 実行時間を見てみるとファイル読み込みに1分以上かかっていることが分かる. さらに列の追加やmerge処理を含むcheck_pu_dateadd_coordinateに30秒以上時間がかかっており,実装した前処理の実行時間は終了するまでに186秒かかっている.

FireDucksでの実行時間

importするライブラリをpandasに置き換えてFireDucksを用いた場合の実行時間の計測を行う.

pip install fireducks

pandasの前処理計算のために記述した上記スクリプトはFireDucksがpandasとの互換性を持っているためにFireDucksをimportすれば,そのまま利用可能である.

import fireducks.pandas as pd

注意点として,FireDucksはメソッドが呼ばれても処理をすぐには実行せず,結果が必要になった際にまとめて処理を実行する. そのため,各メソッドの実行時間を計測するためには_evaliate()を実行する必要がある.

FireDucksをimportし,同様の前処理計算を実行するとpandasとの実行時間との比較の表は以下のようになった.

関数pandas[sec]FireDucks[sec]速度向上率
file_read72.193.5220.49
file_read0.0030.010.38
txt_to_date9.0719.100.48
txt_to_date8.5720.570.42
drop_na3.130.704.47
check_passenger_c3.211.801.79
check_pu_date27.370.9927.64
check_ride_time7.022.003.51
check_trip_distance3.240.913.55
check_total_amount3.110.933.59
add_coordinate28.321.6716.97
in_NY20.752.717.65
Preprocessing186.0254.923.39

file_readについてpandasでは終了まで70秒以上かかっていたが約3.5秒となり20倍以上高速化ができていることが確認できた. またその他の時間がかかっていた処理(check_pu_dateadd_coordinate)についても大幅な時間短縮ができている.

txt_to_dateについてはFireDucksを用いることで計算時間が増えている. これは記事執筆時点においてto_datetime()関数はFireDucksによる高速化対応がされていないためである. しかし,to_datetime()のように高速化未対応な関数を呼ばれたときであってもFireDucksはpandasの関数を呼び出すことで計算を行うためエラーを返すことはない.

本記事の前処理計算においてトータルの計算時間はpandasが186秒であったのに対し,FireDucksでは55秒で約3.4倍の実行速度であった. さらにこの55秒のうち40秒程は高速化未対応の処理に要する時間であり,今後更なる高速化が見込まれる.