R语言 在数据争用管道中迭代多个文件

ryoqjall  于 2023-06-03  发布在  其他
关注(0)|答案(1)|浏览(174)

我正在处理大量(2,000多个)大zip文件(每个约300 MB)data from here。每个文件都具有相同的模式和结构。我希望迭代zip文件列表并执行以下操作:

  • 解压缩文件
  • 数据争用
  • 将文件写入Parquet

我的方法是将mapply通过管道传输到不同的数据处理函数(即clean_namesgroup_by),通过管道传输到write_dataset(参见下面的代码)。然而,这种方法并不是我想要的方式。它是解压缩所有的文件,并将它们导入到一个单一的csv第一,然后执行数据争吵和写入数据。这在一些数据集上是可以的,但是当处理数千个数据集时,内存将很快成为一个问题,因为每个文件有大约700万行。

zip_files <- dput(list.files("~/data/raw", pattern = ".zip", full.names = TRUE))

file_names <- dput(
  lapply(zip_files, unzip, list = TRUE) |>
    dplyr::bind_rows() |>
    dplyr::pull(Name)
)

mapply(unzip, zip_files, file_names) |>
  readr::read_csv() |>
  janitor::clean_names("all_caps") |>
  dplyr::group_by(YEAR = lubridate::year(BASE_DATE_TIME),
                  MONTH = lubridate::month(BASE_DATE_TIME),
                  DAY = lubridate::day(BASE_DATE_TIME)) |>
  arrow::write_dataset("~/data/parquet/ais_points",
                       format = "parquet",
                       max_rows_per_file = 1000000)

我想知道最好的方法是什么

  • 从列表中解压缩文件
  • 导入解压缩的csv
  • 执行数据处理功能
  • 写在 parquet 上
  • 移动到列表中的下一个文件(又名冲洗和重复)
5w9g7ksd

5w9g7ksd1#

遍历zip文件列表,阅读和处理从readr转移到arrow。这为可用的工具和库设置了一定的界限,但当前Map的函数列表相当令人印象深刻-https://arrow.apache.org/docs/r/reference/acero.html
使用4个文件进行测试,得到的数据集有30495179行。

library(arrow)
library(dplyr)
library(cli)
library(stringr)

ais_zips <- list.files("./", "zip$")

for (ais_zip in ais_zips){
  # date from filename
  ymd_lst <- str_extract_all(ais_zip, "\\d+")[[1]] %>% as.integer()
  cli_progress_step("unzip {ais_zip}")
  extr <- unzip(ais_zip)
  cli_progress_step("read_csv_arrow {extr}")
  # read as arrow table
  atbl <- arrow::read_csv_arrow(extr, as_data_frame = FALSE)
  cli_progress_step("process")
  atbl <- atbl %>% 
    mutate(year  = ymd_lst[1],
           month = ymd_lst[2],
           day   = ymd_lst[3]) %>% 
    rename_with(\(x) janitor::make_clean_names(x, "all_caps"))
    # ...
    # some additional wrangling
    # ...
  cli_progress_step("write_dataset")
  atbl %>% 
    group_by(YEAR, MONTH, DAY) %>% 
    write_dataset("./pq", format = "parquet")
  cli_progress_step("remove {extr}")
  file.remove(extr)
}
#> ✔ unzip AIS_2022_01_01.zip [11.9s]
#> ✔ read_csv_arrow ./AIS_2022_01_01.csv [12.2s]
#> ✔ process [706ms]
#> ✔ write_dataset [9.4s]
#> ✔ remove ./AIS_2022_01_01.csv [84ms]
#> ...
arrow::open_dataset("./pq") %>% glimpse()
#> FileSystemDataset with 4 Parquet files
#> 30,495,179 rows x 20 columns
#> $ MMSI                   <int64> 368084090, 368140160, 366941830, 316005971, 316…
#> $ BASE_DATE_TIME <timestamp[ms]> 2022-01-01 02:00:00, 2022-01-01 02:00:00, 2022-…
#> $ LAT                   <double> 29.93174, 30.33475, 29.30919, 46.50268, 46.5032…
#> $ LON                   <double> -89.99243, -87.14429, -94.79702, -84.35674, -84…
#> $ SOG                   <double> 6.0, 0.0, 0.0, 2.4, 0.3, 0.0, 0.0, 0.0, 4.7, 0.…
#> $ COG                   <double> 296.2, 312.0, 180.2, 258.6, 61.9, 215.3, 360.0,…
#> $ HEADING               <double> 299, 87, 511, 257, 511, 511, 511, 511, 511, 511…
#> $ VESSEL_NAME           <string> "LARRY B WHIPPLE", "TWISTED ANGEL", "SAN PATRIC…
#> $ IMO                   <string> NA, "IMO0000000", NA, "IMO9084047", "IMO8745333…
#> $ CALL_SIGN             <string> "WDK7401", "WDL5339", "WCX6675", "CFP2004", "VC…
#> $ VESSEL_TYPE            <int64> 57, 36, 31, 31, 31, 60, 36, 37, 36, 30, 60, 36,…
#> $ STATUS                 <int64> 12, NA, 5, 0, 0, 0, NA, NA, NA, NA, 0, NA, NA, …
#> $ LENGTH                 <int64> 23, 12, 18, 34, 24, 38, 13, 10, 10, 0, 35, 0, 0…
#> $ WIDTH                  <int64> 10, 7, 7, 10, 5, 13, 4, 4, 4, 0, NA, 0, 0, 6, 2…
#> $ DRAFT                 <double> 3.0, NA, NA, 5.3, 3.0, NA, NA, NA, NA, NA, NA, …
#> $ CARGO                  <int64> 57, NA, 57, 99, 50, 69, NA, NA, NA, NA, 31, NA,…
#> $ TRANSCEIVER_CLASS     <string> "A", "B", "A", "A", "A", "A", "B", "B", "B", "B…
#> $ YEAR                   <int32> 2022, 2022, 2022, 2022, 2022, 2022, 2022, 2022,…
#> $ MONTH                  <int32> 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,…
#> $ DAY                    <int32> 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,…

创建于2023-06-01使用reprex v2.0.2

相关问题