Skip to content

Partitioning scheme to write files

Description

[Experimental]

Partitioning schemes are used to write multiple files with sink_* methods.

  • pl$PartitionByKey(): Split by the values of keys. The amount of files that can be written is not limited. However, when writing beyond a certain amount of files, the data for the remaining partitions is buffered before writing to the file.
  • pl$PartitionMaxSize(): Split with a maximum size. If the size reaches the maximum size, it is closed and a new file is opened.
  • pl$PartitionParted(): This is a specialized version of pl$PartitionByKey(). Whereas pl$PartitionByKey() accepts data in any order, this scheme expects the input data to be pre-grouped or pre-sorted. This scheme suffers a lot less overhead, but may not be always applicable. Each new value of the key expressions starts a new partition, therefore repeating the same value multiple times may overwrite previous partitions.

Usage

pl__PartitionByKey(
  base_path,
  ...,
  by,
  include_key = TRUE,
  per_partition_sort_by = NULL
)

pl__PartitionMaxSize(base_path, ..., max_size, per_partition_sort_by = NULL)

pl__PartitionParted(
  base_path,
  ...,
  by,
  include_key = TRUE,
  per_partition_sort_by = NULL
)

Arguments

base_path The base path for the output files. Use the mkdir option of the sink\_\* methods to ensure directories in the path are created.
These dots are for future extensions and must be empty.
by Something can be coerced to a list of expressions. Used to partition by.
include_key A bool indicating whether to include the key columns in the output files.
per_partition_sort_by Something can be coerced to a list of expressions, or NULL (default). Used to sort over within each partition. Note that this might increase the memory consumption needed for each partition.
max_size An integer-ish value indicating the maximum size in rows of each of the generated files.

Examples

library("polars")


# Partitioning by columns
temp_dir_1 <- withr::local_tempdir()
as_polars_lf(mtcars)$sink_parquet(
  pl$PartitionByKey(
    temp_dir_1,
    by = c("cyl", "am"),
    include_key = FALSE,
  ),
  mkdir = TRUE
)
list.files(temp_dir_1, recursive = TRUE)
#> [1] "cyl=4.0/am=0.0/0.parquet" "cyl=4.0/am=1.0/0.parquet"
#> [3] "cyl=6.0/am=0.0/0.parquet" "cyl=6.0/am=1.0/0.parquet"
#> [5] "cyl=8.0/am=0.0/0.parquet" "cyl=8.0/am=1.0/0.parquet"
# Partitioning by max row size
temp_dir_2 <- withr::local_tempdir()
as_polars_lf(mtcars)$sink_csv(
  pl$PartitionMaxSize(
    temp_dir_2,
    max_size = 10,
  ),
  mkdir = TRUE
)

files <- list.files(temp_dir_2, full.names = TRUE)
files
#> [1] "/tmp/Rtmp9ADB8y/filead0561a24e4d/0.csv"
#> [2] "/tmp/Rtmp9ADB8y/filead0561a24e4d/1.csv"
#> [3] "/tmp/Rtmp9ADB8y/filead0561a24e4d/2.csv"
#> [4] "/tmp/Rtmp9ADB8y/filead0561a24e4d/3.csv"
lapply(files, \(x) nrow(read.csv(x)))
#> [[1]]
#> [1] 10
#> 
#> [[2]]
#> [1] 10
#> 
#> [[3]]
#> [1] 10
#> 
#> [[4]]
#> [1] 2