Apache Arrow 鬼はええ!
このままCSV全部Parquetに
変換していこうぜ!

2022-03-19 第97回R勉強会@東京
@eitsupi

はじめに

自己紹介

  • @eitsupi
  • 製造業勤務
    • Excelが嫌になりRを触り初めて3年
  • Dockerイメージrocker/r-ver他のメンテナー
  • VSCode派
    • Remote-Containersばかり使っている
  • このスライドでQuartoに挑戦

今日の話

  • 数十分かけて読み込んでいたCSVファイル群をParquetに置換する際に調べたこと(数十秒~数分で読めるようになった)
  • ArrowとParquetのことを少しでも知ってもらい、試すきっかけになれば……

対象かも

  • ✅データはCSVファイル
  • ✅大量のファイルを読む
  • ✅読み込みに数十分かかる

対象外かも

  • ✅データはDB上
  • ✅少数のファイルを読む
  • ✅読み込みは数秒で終わる

結論

Q. CSVをParquetにするとどのくらい早くなる?

A. 場合による(ようなので試してみましょう!)

Apache Parquet と
Apache Arrow

Apache Parquet

  • 2013年~1
  • Apache Hadoop用に作られた列指向ファイルフォーマット
    • 列方向に圧縮されるため大量のレコードを圧縮しやすい
    • 列単位でベクトル化した計算を行う分析処理と相性が良い

Apache Arrow

  • 2016年~2
  • A high-performance cross-system data layer for columnar in-memory analytics
  • 言語に寄らない列指向インメモリフォーマットの標準を目指しているプロジェクト
    • Arrowを介することで、ある列指向データから別の列指向データへの変換を個別に実装する必要はなくなる

ParquetとFeather

  • 2016年時点でArrowはファイル形式を提供しなかったため、ファイル形式およびそのファイルを読み書きするFeatherライブラリが試験的に作られる
  • 2020年にArrow IPC (Inter-Process Communication)フォーマットをLZ4かZSTDで圧縮した通称FeatherV2がArrow本体に組み込まれる3
    • featherと呼ばれたりarrowと呼ばれたりipcと呼ばれたり……
    • 2021年に決まった正式な拡張子は.arrow4
  • 一方2017年にはParquetをArrowライブラリで読み書きできるようになっており、Parquetの利用が推奨されている5

Arrow R Package

  • パッケージ名はarrow
  • Apache Arrow C++ライブラリ(libarrow)のRバインディング
  • ソースインストールするとlibarrowのビルドに長時間がかかることに注意!
  • RockerプロジェクトのDockerイメージではrocker/tidyverseにインストール済(なのでこのスライド内のサンプルコードはrocker/tidyverseで動くはず)

こちらもどうぞ

arrowによるファイル読み込み

ファイルの読み書き

arrowパッケージは独自にファイルを読み書きする関数を持っている

表 1: データ読み込み関数の比較
対象ファイル utils readr arrow
csv 単体 read.csv read_csv read_csv_arrow
csv 複数 - read_csv open_dataset
parquet 単体 - - read_parquet
parquet 複数 - - open_dataset

CSVの読み込み 1/3

file_csv <- readr::readr_example("mtcars.csv")

arrow::read_csv_arrow(file_csv, as_data_frame = FALSE)
Table
32 rows x 11 columns
$mpg <double>
$cyl <int64>
$disp <double>
$hp <int64>
$drat <double>
$wt <double>
$qsec <double>
$vs <int64>
$am <int64>
$gear <int64>
$carb <int64>

as_data_frame引数はRのdata.frameに変換するかarrow::Tableのままにするかを制御する(デフォルトはTRUE

CSVの読み込み 2/3

同じ列構造を持つ複数ファイルを読み込みたい場合はopen_dataset()によりデータセットとして開く

c(file_csv, file_csv) |> arrow::open_dataset(format = "csv")
FileSystemDataset with 2 csv files
mpg: double
cyl: int64
disp: double
hp: int64
drat: double
wt: double
qsec: double
vs: int64
am: int64
gear: int64
carb: int64

この段階では矩形データの構造(スキーマ)を読み込んだだけで、データ全体を読み込んではいない

CSVの読み込み 3/3

computecollectでデータをメモリ上に読み込む

ds <- c(file_csv, file_csv) |> arrow::open_dataset(format = "csv")

Table

ds |> dplyr::compute()
Table
64 rows x 11 columns
$mpg <double>
$cyl <int64>
$disp <double>
$hp <int64>
$drat <double>
$wt <double>
$qsec <double>
$vs <int64>
$am <int64>
$gear <int64>
$carb <int64>

data.frame

ds |> dplyr::collect()
# A tibble: 64 × 11
     mpg   cyl  disp    hp  drat    wt  qsec    vs    am  gear  carb
   <dbl> <int> <dbl> <int> <dbl> <dbl> <dbl> <int> <int> <int> <int>
 1  21       6  160    110  3.9   2.62  16.5     0     1     4     4
 2  21       6  160    110  3.9   2.88  17.0     0     1     4     4
 3  22.8     4  108     93  3.85  2.32  18.6     1     1     4     1
 4  21.4     6  258    110  3.08  3.22  19.4     1     0     3     1
 5  18.7     8  360    175  3.15  3.44  17.0     0     0     3     2
 6  18.1     6  225    105  2.76  3.46  20.2     1     0     3     1
 7  14.3     8  360    245  3.21  3.57  15.8     0     0     3     4
 8  24.4     4  147.    62  3.69  3.19  20       1     0     4     2
 9  22.8     4  141.    95  3.92  3.15  22.9     1     0     4     2
10  19.2     6  168.   123  3.92  3.44  18.3     1     0     4     4
# … with 54 more rows

Parquetファイルの読み込み 1/2

write_parquetでParquetファイルを書き込んで、read_parquetで読み込む

arrow::write_parquet(mtcars, "mtcars.parquet")

arrow::read_parquet("mtcars.parquet", as_data_frame = FALSE)
Table
32 rows x 11 columns
$mpg <double>
$cyl <double>
$disp <double>
$hp <double>
$drat <double>
$wt <double>
$qsec <double>
$vs <double>
$am <double>
$gear <double>
$carb <double>

See $metadata for additional Schema metadata

Parquetファイルの読み込み 2/2

Parquetの場合も複数ファイルの場合はopen_datasetを使う
format引数のデフォルトは"parquet"なので指定しなくてもよい

c("mtcars.parquet", "mtcars.parquet") |>
  arrow::open_dataset(format = "parquet") |>
  dplyr::compute()
Table
64 rows x 11 columns
$mpg <double>
$cyl <double>
$disp <double>
$hp <double>
$drat <double>
$wt <double>
$qsec <double>
$vs <double>
$am <double>
$gear <double>
$carb <double>

See $metadata for additional Schema metadata

dplyr query

dplyrの基本おさらい

多くの関数はdata.frameを第一引数にとりdata.frameを返す

class(mtcars)
[1] "data.frame"
class(dplyr::select(mtcars, cyl))
[1] "data.frame"

もしくはパイプ演算子を使って

mtcars |>
  dplyr::select(cyl) |>
  class()
[1] "data.frame"

arrow_dplyr_query

TableDatasetをそれらのdplyrの関数の第一引数に渡すとarrow_dplyr_queryクラスオブジェクトが返ってくる

arrow::open_dataset("mtcars.parquet") |>
  dplyr::select(cyl) |>
  class()
[1] "arrow_dplyr_query"
  • arrow_dplyr_queryを第一引数に渡した場合も同じ挙動
    • data.frameのようにパイプラインを繋げていける
  • computecollectに渡すとクエリが実行される(dbplyrに類似)
    • Arrowインメモリフォーマットのままdplyrで記述した処理を実行できる

遅延評価

データセットはcomputecollectに繋げるまで読み込まれない

c("mtcars.parquet", "mtcars.parquet") |>
  arrow::open_dataset(format = "parquet") |>
  dplyr::collect()

遅延評価

クエリもcomputecollectに繋げるまで評価されない

c("mtcars.parquet", "mtcars.parquet") |>
  arrow::open_dataset(format = "parquet") |>
  dplyr::filter(cyl == 6) |>
  dplyr::select(dplyr::starts_with("d")) |>
  dplyr::collect()
  • dplyrクエリはarrowパッケージによって翻訳されlibarrowがクエリを実行する
  • 翻訳可能な関数はarrowに登録されているもののみなので、非対応の関数を含めるとエラーになる(データセットに対するクエリの場合)
  • 対応している関数は徐々に増えており、NEWSで確認可能

プッシュダウン

クエリもcomputecollectに繋げるまで評価されない

c("mtcars.parquet", "mtcars.parquet") |>
  arrow::open_dataset(format = "parquet") |>
  dplyr::filter(cyl == 6) |>
  dplyr::select(dplyr::starts_with("d")) |>
  dplyr::collect()
  • Parquetデータセットに対してクエリを実行するとき、クエリを解析し必要な列と行のみをファイルから読み込む(プッシュダウン
    • CSVと比べた場合のParquetの大きな利点
      • 読み込むデータの少ないほど読み込み時間は短縮される
      • 読み込むデータの少ないほど省メモリで処理できる

実行結果

クエリもcomputecollectに繋げるまで評価されない

c("mtcars.parquet", "mtcars.parquet") |>
  arrow::open_dataset(format = "parquet") |>
  dplyr::filter(cyl == 6) |>
  dplyr::select(dplyr::starts_with("d")) |>
  dplyr::collect()
    disp drat
1  160.0 3.90
2  160.0 3.90
3  258.0 3.08
4  225.0 2.76
5  167.6 3.92
6  167.6 3.92
7  145.0 3.62
8  160.0 3.90
9  160.0 3.90
10 258.0 3.08
11 225.0 2.76
12 167.6 3.92
13 167.6 3.92
14 145.0 3.62

データセットの作成

列の値毎に分割した複数のParquetファイルをデータセットとして書き込める(パーティショニング)

fs::dir_create("test_data")

c("mtcars.parquet") |>
  arrow::open_dataset(format = "parquet") |>
  arrow::write_dataset("test_data", partitioning = "cyl")

なお上記のようにデータセットからデータセットに直接変換する場合等はバッチ毎に逐次処理されるので、メモリに乗り切らないデータを加工可能6……かもしれない7

Hive-style パーティション

fs::dir_create("test_data")
arrow::write_dataset(mtcars, "test_data", partitioning = "cyl")

上のコードを実行すると以下のような複数のディレクトリとParquetファイルが生成される

$ tree test_data
test_data
├── cyl=4
│   └── part-0.parquet
├── cyl=6
│   └── part-0.parquet
└── cyl=8
    └── part-0.parquet

3 directories, 3 files

Parquetファイルにはcyl列が含まれておらず代わりにディレクトリ名がkey=valueの形式になっている

パーティショニングを利用する場合の注意

  • Parquetファイルのサイズとパーティション数8
  • そのまま読み込み可能なツールは限られる
>>> import pyarrow.dataset as ds
>>> ds.dataset("test_data", partitioning="hive")
<pyarrow._dataset.FileSystemDataset object at 0x7f1162853730>

dplyrクエリ中での型変更

as.integer等の一般的な関数は登録済みのものが多い

任意のArrowタイプに変換するにはmutate等の中でcastを使用する
castは単体の関数として存在しないため、ヘルプを検索してもヒットしない

mtcars |>
  arrow::arrow_table() |>
  dplyr::transmute(cyl = cast(cyl, arrow::int8())) |>
  dplyr::compute()
Table
32 rows x 1 columns
$cyl <int8>

See $metadata for additional Schema metadata

duckdbとの連携

DuckDB

  • 2019年~9
  • しばしば The SQLite for Analytics と紹介されている、SQLiteのような使い勝手を目指した列指向の分析用RDBMS
  • Parquetファイル(複数可)に対してクエリを実行できる
    • 現段階ではHiveスタイルのパーティションには非対応
    • Snappy圧縮のみ対応
  • シングルバイナリのCLIや、公式Python、Rパッケージ等から実行
    • PythonとRにはArrowとDuckDBの相互変換機能があり、Arrowオブジェクトに対してDuckDBのクエリを実行可能10

to_arrowとto_duckdb

dplyrのパイプライン中でarrowduckdbのクエリを相互切り替え可能

arrow7.0.0の対応していないslice_minをduckdb側で処理する例

arrow::open_dataset("mtcars.parquet") |>
  dplyr::select(mpg, cyl) |>
  dplyr::group_by(cyl) |>
  arrow::to_duckdb() |>
  dplyr::slice_min(mpg, n = 3) |>
  arrow::to_arrow() |>
  dplyr::compute()
Table
10 rows x 2 columns
$mpg <double>
$cyl <double>
  • arrowから見たメリット:非対応クエリをduckdb側で処理できる
  • duckdbから見たメリット:arrowのファイル読み込みを利用できる

まとめ

  • Apache Arrowはこわくない
  • Apache Parquetはすぐ試せる
  • dplyrは心強い

Enjoy!

脚注

  1. Announcing Parquet 1.0: Columnar Storage for Hadoop

  2. The Apache® Software Foundation Announces Apache Arrow™ as a Top-Level Project

  3. Apache Arrow 0.17.0 Release

  4. Apache Arrowデータのメディアタイプ(MIMEタイプ)

  5. Feather V2 with Compression Support in Apache Arrow 0.17.0

  6. PythonライブラリのドキュメントWriting large amounts of dataに記載あり

  7. 現時点ではParquetスキャンに大量のメモリを消費するようで、私のノートPCはクラッシュした

  8. Partitioning performance considerations

  9. This is the first preview release of DuckDB.

  10. DuckDB quacks Arrow: A zero-copy data integration between Apache Arrow and DuckDB