Partitioning scheme to write files
Description
Partitioning schemes are used to write multiple files with
sink_*
methods.
-
pl$PartitionByKey()
: Split by the values of keys. The amount of files that can be written is not limited. However, when writing beyond a certain amount of files, the data for the remaining partitions is buffered before writing to the file. -
pl$PartitionMaxSize()
: Split with a maximum size. If the size reaches the maximum size, it is closed and a new file is opened. -
pl$PartitionParted()
: This is a specialized version ofpl$PartitionByKey()
. Whereaspl$PartitionByKey()
accepts data in any order, this scheme expects the input data to be pre-grouped or pre-sorted. This scheme suffers a lot less overhead, but may not be always applicable. Each new value of the key expressions starts a new partition, therefore repeating the same value multiple times may overwrite previous partitions.
Usage
pl__PartitionByKey(
base_path,
...,
by,
include_key = TRUE,
per_partition_sort_by = NULL
)
pl__PartitionMaxSize(base_path, ..., max_size, per_partition_sort_by = NULL)
pl__PartitionParted(
base_path,
...,
by,
include_key = TRUE,
per_partition_sort_by = NULL
)
Arguments
base_path
|
The base path for the output files. Use the mkdir option of
the sink\_\* methods to ensure
directories in the path are created.
|
…
|
These dots are for future extensions and must be empty. |
by
|
Something can be coerced to a list of expressions. Used to partition by. |
include_key
|
A bool indicating whether to include the key columns in the output files. |
per_partition_sort_by
|
Something can be coerced to a list of expressions, or NULL
(default). Used to sort over within each partition. Note that this might
increase the memory consumption needed for each partition.
|
max_size
|
An integer-ish value indicating the maximum size in rows of each of the generated files. |
Examples
library("polars")
# Partitioning by columns
temp_dir_1 <- withr::local_tempdir()
as_polars_lf(mtcars)$sink_parquet(
pl$PartitionByKey(
temp_dir_1,
by = c("cyl", "am"),
include_key = FALSE,
),
mkdir = TRUE
)
list.files(temp_dir_1, recursive = TRUE)
#> [1] "cyl=4.0/am=0.0/0.parquet" "cyl=4.0/am=1.0/0.parquet"
#> [3] "cyl=6.0/am=0.0/0.parquet" "cyl=6.0/am=1.0/0.parquet"
#> [5] "cyl=8.0/am=0.0/0.parquet" "cyl=8.0/am=1.0/0.parquet"
# Partitioning by max row size
temp_dir_2 <- withr::local_tempdir()
as_polars_lf(mtcars)$sink_csv(
pl$PartitionMaxSize(
temp_dir_2,
max_size = 10,
),
mkdir = TRUE
)
files <- list.files(temp_dir_2, full.names = TRUE)
files
#> [1] "/tmp/Rtmp9ADB8y/filead0561a24e4d/0.csv"
#> [2] "/tmp/Rtmp9ADB8y/filead0561a24e4d/1.csv"
#> [3] "/tmp/Rtmp9ADB8y/filead0561a24e4d/2.csv"
#> [4] "/tmp/Rtmp9ADB8y/filead0561a24e4d/3.csv"
#> [[1]]
#> [1] 10
#>
#> [[2]]
#> [1] 10
#>
#> [[3]]
#> [1] 10
#>
#> [[4]]
#> [1] 2