在sparklyr中完成Dataframe

relj7zay  于 2021-05-19  发布在  Spark
关注(0)|答案(2)|浏览(573)

我正试图复制 tidyr:complete 在SparkyR中运行。我有一个缺少值的Dataframe,我必须填写这些行。在dplyr/tidyr中,我可以做:

data <- tibble(
  "id" = c(1,1,2,2),
  "dates" = c("2020-01-01", "2020-01-03", "2020-01-01", "2020-01-03"),
  "values" = c(3,4,7,8))

# A tibble: 4 x 3

     id dates      values
  <dbl> <chr>       <dbl>
1     1 2020-01-01      3
2     1 2020-01-03      4
3     2 2020-01-01      7
4     2 2020-01-03      8

data %>% 
  mutate(dates = as_date(dates)) %>% 
  group_by(id) %>% 
  complete(dates = seq.Date(min(dates), max(dates), by="day"))

# A tibble: 6 x 3

# Groups:   id [2]

     id dates      values
  <dbl> <date>      <dbl>
1     1 2020-01-01      3
2     1 2020-01-02     NA
3     1 2020-01-03      4
4     2 2020-01-01      7
5     2 2020-01-02     NA
6     2 2020-01-03      8

然而 complete 中不存在函数 sparklyr .

data_spark %>% 
  mutate(dates = as_date(dates)) %>% 
  group_by(id) %>% 
  complete(dates = seq.Date(min(dates), max(dates), by="day"))

Error in UseMethod("complete_") : 
no applicable method for 'complete_' applied to an object of class "c('tbl_spark', 'tbl_sql', 'tbl_lazy', 'tbl')"

有没有办法设定一个自定义项或达到类似的效果?
谢谢您

zphenhs4

zphenhs41#

这里有一个方法可以完成spark中的所有工作。

library(sparklyr)

sc <- spark_connect(master = "local")

data <- tibble(
  id = c(1, 1, 2, 2),
  dates = c("2020-01-02", "2020-01-04", "2020-01-01", "2020-01-03"),
  values = c(1, 2, 3, 4)
)

data_spark <- copy_to(sc, data)

我们需要生成 dates 以及 id . 要做到这一点,我们需要知道总天数和第一个日期。

days_info <-
  data_spark %>%
  summarise(
    first_date = min(dates),
    total_days = datediff(max(dates), min(dates))
  ) %>%
  collect()
days_info

# > # A tibble: 1 x 2

# >   first_date total_days

# >   <chr>           <int>

# > 1 2020-01-01          3

``` `sdf_seq` 可用于在spark中生成序列。这可以用来得到 `dates` 以及 `id` .

dates_id_combinations <-
sdf_seq(
sc,
from = 0,
to = days_info$total_days,
repartition = 1
) %>%
transmute(
dates = date_add(local(days_info$first_date), id),
join_by = TRUE
) %>%
full_join(data_spark %>% distinct(id) %>% mutate(join_by = TRUE)) %>%
select(dates, id)
dates_id_combinations

> # Source: spark<?> [?? x 2]

> dates id

>

> 1 2020-01-01 1

> 2 2020-01-01 2

> 3 2020-01-02 1

> 4 2020-01-02 2

> 5 2020-01-03 1

> 6 2020-01-03 2

> 7 2020-01-04 1

> 8 2020-01-04 2

``` full_join 原始Dataframe和组合Dataframe。然后根据 min / max 每组的日期。

data_spark %>%
  group_by(id) %>%
  mutate(first_date = min(dates), last_date = max(dates)) %>%
  full_join(dates_id_combinations) %>%
  filter(dates >= min(first_date), dates <= max(last_date)) %>%
  arrange(id, dates) %>%
  select(id, dates)

# > # Source:     spark<?> [?? x 2]

# > # Groups:     id

# > # Ordered by: id, dates

# >      id dates

# >   <dbl> <chr>

# > 1     1 2020-01-02

# > 2     1 2020-01-03

# > 3     1 2020-01-04

# > 4     2 2020-01-01

# > 5     2 2020-01-02

# > 6     2 2020-01-03
7d7tgy0s

7d7tgy0s2#

在引擎盖下 tidyr::complete 只执行一个完全连接,然后是可选的na填充。你可以通过使用 sdf_copy_to 创建一个仅为一列的新sdf seq.Date 在开始日期和结束日期之间,然后执行 full_join 在这和你的数据集之间。

相关问题