我可以将数据从数据库或另一个文件流传输到一个分区的parquet(箭头)数据集中吗?

pbpqsu0x  于 2023-03-15  发布在  其他
关注(0)|答案(1)|浏览(142)

我处理的表大小为几十或几百千兆字节。它们位于postgres数据库中。我还将它们转储到CSV文件中。
我想建立一个分区的 parquet 数据集。我写了一个脚本,它对数据的一个小子集做了我想做的事情。我不想这样做整个数据集。1)将需要太多的内存。2)将非常慢。
我在dbplyr、arrow和duckdb文档中搜索了一种方法,可以直接将惰性表写入磁盘,而无需将其读入内存。
也许我可以分块收集数据,比如说 years,但是如何将它们写入同一个数据集呢?

twh00eeo

twh00eeo1#

因为arrow::write_dataset可以基于数据中的字段进行hive样式的分区(按子目录),所以我建议您编写“尽可能多的parquet文件”来完成这项工作,然后可以选择将它们组合在一起。
注意:关于“我们定义了多少个键”的问题,请注意,大量行数较少的文件将是低效的。https://arrow.apache.org/docs/r/articles/dataset.html#partitioning-performance-considerations有关分区/大小性能的讨论,请参见www.example.com,其中目前的指导方针是:

    • 避免小于20 MB和大于2GB的文件。*
    • 避免使用超过10,000个不同分区的分区布局。*

也就是说,如果数据中有两个键(比如key1key2),那么当你使用arrow::write_dataset时,它会把它们插入到subdir路径中,因为write_dataset的默认行为是覆盖或者在找到现有文件时出错,所以我们使用一个外部计数器并更新它们的basename_template,这样每次连续的写操作都会增加而不是替换。
以下是我建议的路径:

  • 批量提取数据,尽可能多地在内存中提取而不崩溃,但给予自己一些缓冲。假设每批提取10 Mi行。(这可以从数据库或CSV文件中提取。该大小高度依赖于列及其类的数量,而字符串占用的内存要多得多。)
  • 在对arrow::write_dataset的调用中使用模板的增量counter
  • 您可以选择预先删除大型对象变量并强制垃圾收集;我还没有进行基准测试,看看这是否真的很关键,但rm(dat); gc()可能会很有用。
  • 重复。

下面是假装的两个步骤:

counter <- 1L
dat <- data.frame(key1 = c("A","A","B"), key2=c("E","F","F"), val=1:3)
arrow::write_dataset(dat, "somedir", partitioning = c("key1", "key2"),
                     basename_template = sprintf("part-%s-{i}.pq", counter))
list.files("somedir", recursive = TRUE, full.names = TRUE)
# [1] "somedir/key1=A/key2=E/part-1-0.pq"
# [2] "somedir/key1=A/key2=F/part-1-0.pq"
# [3] "somedir/key1=B/key2=F/part-1-0.pq"

counter <- counter + 1L
dat <- data.frame(key1 = c("A","C","D"), key2=c("E","F","G"), val=2:4)
arrow::write_dataset(dat, "somedir", partitioning = c("key1", "key2"),
                     basename_template = sprintf("part-%s-{i}.pq", counter))
list.files("somedir", recursive = TRUE, full.names = TRUE)
# [1] "somedir/key1=A/key2=E/part-1-0.pq"
# [2] "somedir/key1=A/key2=E/part-2-0.pq"
# [3] "somedir/key1=A/key2=F/part-1-0.pq"
# [4] "somedir/key1=B/key2=F/part-1-0.pq"
# [5] "somedir/key1=C/key2=F/part-2-0.pq"
# [6] "somedir/key1=D/key2=G/part-2-0.pq"

"A"/"E"的第一行在第二批中重复,因此我们看到key1=A/key2=E中现在有两个文件。为了证明这一点,我们现在可以读取 * 目录 *,以查看两批数据中的每一批数据都可以在一个数据集连接中使用:

library(dplyr)
arrow::open_dataset("somedir/key1=A/key2=E/") %>%
  collect()
#   val
# 1   1
# 2   2

(注意,key1key2由路径暗示)。
或者我们可以读取整个内容(假设所有帧都有相同的模式),只过滤我们需要的关键字,然后 * collect()(这是数据被拉入内存的位置:

arrow::open_dataset("somedir/") %>%
  filter(key1 == "A", key2 == "E") %>%
  collect()
#   val key1 key2
# 1   1    A    E
# 2   2    A    E

(The key被保留,因为我们在子目录层次结构中打开了更高的位置。)
如果在某个时候你发现你有很多很多文件在这些子目录中的任何一个,那么我建议你可以手动地将它们组合在一起。根据每个 parquet 文件的行数,你可能无法加载一个子目录中的所有文件(例如,somedir/key1=A/key2=E/)一次加载到内存中,但同样,您可以批量处理多个文件,将它们加载到内存中,保存为单个 parquet 文件,然后把其他人赶走。

相关问题