Pythonの高速データフレームライブラリFireDucksを使ってみた
pandasは,プログラミング言語Pythonにおいて,データ解析を支援する機能を提供するライブラリである. NECの研究所では高速化版pandasであるFireDucksというライブラリを開発している.
データの準備
ニューヨークのタクシーの乗降者履歴のデータを対象に分析を行う. データの出典は以下である:
https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page
大規模データの解析を行うため,上記リンクから「Yellow Taxi Trip Records」を2022年1月から2023年6月までのデータをダウンロードし結合した. データはparquet形式で提供されているが,日頃良く使うcsv形式で試すために変換しておく. 参考までにデータ準備用のスクリプトを付記しておく.
import pandas as pd
import os
dir = "xxx"
df_list = []
for year in [2022, 2023]:
for i in range(12):
month = str(i+1).zfill(2)
fn = f"yellow_tripdata_{year}-{month}.parquet"
file = os.path.join(dir, fn)
if not os.path.exists(file):
continue
df = pd.read_parquet(fn)
df_list.append(df)
all_df = pd.concat(df_list)
all_df.to_csv("taxi_all.csv")
データの中身は以下のような値が入っている(列は一部抜粋).
列名 | データ型 | 説明 |
---|---|---|
passenger_count | int | 乗車人数 |
pu_location_Id | string | タクシーメーターが作動し始めたTLCタクシーゾーン. |
do_location_Id | string | タクシーメーターが解除されたTLCタクシーゾーン. |
tpep_dropoff_datetime | string | メーターが解除された日時. |
tpep_pickupdate_time | string | メーターが作動し始めた日時. |
trip_distance | double | タクシーメーターによって報告された走行距離(マイル単位). |
total_amount | double | 乗客に請求される合計金額. ※現金のチップは含まれない. |
extra | double | その他の割増料金と追加料金.現在,これには0.50ドルおよび1ドルのラッシュアワー料金と夜間料金のみが含まれる. |
fare_amount | double | メーターによって計算された時間距離併用運賃. |
実際に行う前処理
用意したデータに対して,データ分析でよく利用される型変換,列追加,異常値削除などの一連の前処理計算を行う.
まず速度計測用のラッパーを用意しておく.
_evaluate()
については後述する.
from time import time
from functools import wraps
def timer(func):
@wraps(func)
def wp(*args, **kargs):
t = time()
ret = func(*args, **kargs)
print(f"{func.__name__} : {(time() - t):.5g} [sec]")
return ret
return wp
def evaluate(df):
if hasattr(df, "_evaluate"):
df._evaluate()
データの読み込み
まずはデータを読み込む.
pandasをimportしてからread_csv
で読み込む.
関数だけ定義しておいて,後で纏めて呼び出して計測する.
import pandas as pd
@timer
def file_read(fn, args={}):
df = pd.read_csv(fn, **args)
evaluate(df)
print(df.shape)
return df
データ処理
欠損値があるデータを削除する.
@timer
def drop_na(df):
df.dropna(how="all", inplace=True)
evaluate(df)
return df
乗車,降車の年月日時はstring型で読み込んだので,日付型に変換しておく.
@timer
def txt_to_date(df, low):
df[low] = pd.to_datetime(df[low])
evaluate(df)
return df
乗車数でgroupbyして分布を見てみる(printは性能評価に含めないので省略). 乗車数0人のデータがあることが分かるので,削除する.
# 一人以上乗車している
@timer
def check_passenger_c(df):
df_ = df.groupby("passenger_count").size()
evaluate(df_)
df = df[df["passenger_count"] > 0]
evaluate(df)
return df
乗車日時の日付データから,年,月,日,時の情報を取り出し,列を追加する. 乗車年や月の分布を見てみると不適切な値が含まれているので,削除する.
# 乗車年/月が正しい
@timer
def check_pu_date(df):
df['year'] = df['tpep_pickup_datetime'].dt.year
df['month'] = df['tpep_pickup_datetime'].dt.month
df['date'] = df['tpep_pickup_datetime'].dt.day
df['hour'] = df['tpep_pickup_datetime'].dt.hour
df_ = df.groupby("year").size()
evaluate(df_)
df = df[(df['year'] == 2022) | (df['year'] == 2023)]
df_ = df.groupby("month").size()
evaluate(df_)
df = df[(df['month'] >= 1) & df['month'] <= 12]
df_ = df.groupby(["year", "month"]).size()
evaluate(df_)
evaluate(df)
return df
降車時間と乗車時間の差を分に変換して列を追加する. 非正の乗車時間や長すぎる乗車時間を削除する.
# 現実的な乗車時間(分)
@timer
def check_ride_time(df):
df["ride_time"] = (df["tpep_dropoff_datetime"] - df["tpep_pickup_datetime"]).dt.seconds / 60
df = df[(df["ride_time"] > 0) & (df["ride_time"] <= 180)]
evaluate(df)
return df
乗車距離や運賃についても非負や大きすぎる値を削除する.
# 現実的な距離
@timer
def check_trip_distance(df):
df = df[(df["trip_distance"] > 0) & (df["trip_distance"] <= 250)]
evaluate(df)
return df
# 現実的な運賃
@timer
def check_total_amount(df):
df = df[(df["total_amount"] > 0) & (df["total_amount"] <= 1000)]
evaluate(df)
return df
乗降車地点のIDから緯度経度を算出する. IDと地点の関係は以下で確認することができる.
https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv
緯度経度は,
をもとに作成した変換テーブルをmergeすることで列を追加する.
緯度経度の情報からニューヨーク市外のデータを削除する.
# IDから緯度経度を割り出す
@timer
def add_coordinate(df, ID_df):
df = df.merge(ID_df.rename(columns={"longitude": "start_lon", "latitude": "start_lat"}),
left_on="PULocationID", right_on="LocationID", how="left").drop("LocationID", axis=1)
df = df.merge(ID_df.rename(columns={"longitude": "end_lon", "latitude": "end_lat"}),
left_on="DOLocationID", right_on="LocationID", how="left").drop("LocationID", axis=1)
evaluate(df)
return df
# NY内かをcheckする
@timer
def in_NY(df):
df = df[(df["start_lon"] <= -71.47) & (df["start_lon"] >= -79.45)]
df = df[(df["start_lat"] >= 40.29) & (df["start_lat"] <= 45)]
df = df[(df["end_lon"] <= -71.47) & (df["end_lon"] >= -79.45)]
df = df[(df["end_lat"] >= 40.29) & (df["end_lat"] <= 45)]
evaluate(df)
return df
def check_in_NY(df, ID_df):
df = add_coordinate(df, ID_df)
df = in_NY(df)
return df
以上のようにデータを読み込み,型変換,列追加,異常値削除をする一連の処理を用意した.
- ファイルの読み込み
- 日付データを文字列から日付型に変換
- 前処理
- 欠損値を削除
- 乗客数のチェック
- groupbyで分布を確認
- 1人以上を選択
- 乗車時刻をチェック
- 日付データから年月日時を取得し,列を追加
- 年でgroupbyして,確認 → 該当年のみを選択
- 月でgroupbyして,確認 → 1~12月のみを選択
- 年,月でgroupbyして分布を確認
- 乗車時間をcheck
- 降車時刻と乗車時刻の差をとり,分に変換して列を追加(
dt.total_second
がFireDucks未対応) - 現実的な乗車時間のデータを選択する
- 乗車距離をチェック
- 現実的な乗車距離のデータを選択する
- 料金をチェック
- 現実的な料金のデータを選択する
- NY市内のデータを選択
- 乗降車のIDと,緯度経度のテーブルをマージする
- 乗降車の緯度経度がNY市内となっているデータを選択
pandasでの実行時間
まずはpandasでの実行時間を確認してみる. 測定には24コアXeonサーバー(Intel(R) Xeon(R) Gold 6226 CPU x 2,メインメモリ256GB)を用いた.
import os
data_path = "data_sets"
fn = os.path.join(data_path, "taxi_all.csv")
ID_file = os.path.join(data_path, "ID_to_coordinate.csv")
need_cols = ['tpep_pickup_datetime','tpep_dropoff_datetime', 'passenger_count', 'trip_distance',
'PULocationID', 'DOLocationID', 'total_amount', 'improvement_surcharge', 'extra', 'fare_amount', 'RatecodeID']
@timer
def Preprocessing():
df = file_read(fn, {"usecols": need_cols})
ID_df = file_read(ID_file)
df = txt_to_date(df, "tpep_pickup_datetime")
df = txt_to_date(df, "tpep_dropoff_datetime")
df = drop_na(df)
df = check_passenger_c(df)
df = check_pu_date(df)
df = check_ride_time(df)
df = check_trip_distance(df)
df = check_total_amount(df)
df = check_in_NY(df, ID_df)
evaluate(df)
return df
df = Preprocessing()
さて,pandasでの実行時間は下表のような結果になった.
file_read
が2件あるのはtaxiのデータの他に,マージ用の地点データも読み込んだためであり,txt_to_date
は乗車時間と降車時間の2件を変換したためである.
実行時間を見てみるとファイル読み込みに1分以上かかっていることが分かる.
さらに列の追加やmerge処理を含むcheck_pu_date
やadd_coordinate
に30秒以上時間がかかっており,実装した前処理の実行時間は終了するまでに186秒かかっている.
FireDucksでの実行時間
importするライブラリをpandasに置き換えてFireDucksを用いた場合の実行時間の計測を行う.
pip install fireducks
pandasの前処理計算のために記述した上記スクリプトはFireDucksがpandasとの互換性を持っているためにFireDucksをimportすれば,そのまま利用可能である.
import fireducks.pandas as pd
注意点として,FireDucksはメソッドが呼ばれても処理をすぐには実行せず,結果が必要になった際にまとめて処理を実行する.
そのため,各メソッドの実行時間を計測するためには_evaliate()
を実行する必要がある.
FireDucksをimportし,同様の前処理計算を実行するとpandasとの実行時間との比較の表は以下のようになった.
関数 | pandas[sec] | FireDucks[sec] | 速度向上率 |
---|---|---|---|
file_read | 72.19 | 3.52 | 20.49 |
file_read | 0.003 | 0.01 | 0.38 |
txt_to_date | 9.07 | 19.10 | 0.48 |
txt_to_date | 8.57 | 20.57 | 0.42 |
drop_na | 3.13 | 0.70 | 4.47 |
check_passenger_c | 3.21 | 1.80 | 1.79 |
check_pu_date | 27.37 | 0.99 | 27.64 |
check_ride_time | 7.02 | 2.00 | 3.51 |
check_trip_distance | 3.24 | 0.91 | 3.55 |
check_total_amount | 3.11 | 0.93 | 3.59 |
add_coordinate | 28.32 | 1.67 | 16.97 |
in_NY | 20.75 | 2.71 | 7.65 |
Preprocessing | 186.02 | 54.92 | 3.39 |
file_read
についてpandasでは終了まで70秒以上かかっていたが約3.5秒となり20倍以上高速化ができていることが確認できた.
またその他の時間がかかっていた処理(check_pu_date
,add_coordinate
)についても大幅な時間短縮ができている.
txt_to_date
についてはFireDucksを用いることで計算時間が増えている.
これは記事執筆時点においてto_datetime()
関数はFireDucksによる高速化対応がされていないためである.
しかし,to_datetime()
のように高速化未対応な関数を呼ばれたときであってもFireDucksはpandasの関数を呼び出すことで計算を行うためエラーを返すことはない.
本記事の前処理計算においてトータルの計算時間はpandasが186秒であったのに対し,FireDucksでは55秒で約3.4倍の実行速度であった. さらにこの55秒のうち40秒程は高速化未対応の処理に要する時間であり,今後更なる高速化が見込まれる.