我尝试使用sparkyr在sparkDataframe中插入和填充缺失的值。该代码使用普通的Dataframe工作,但我将感谢帮助转换代码工作在Spark。这就是我尝试过的
library(tidyverse)
library(sparklyr)
ts <- tibble(timestamp=seq(as.POSIXct('2020/01/01'), as.POSIXct('2020/01/10'), by="1 sec"))
loco_list <- 1:25
n_obs <- 1e3
#R dataframe version------------------------------------------------
#create dummy data
data <- tibble(
timestamp=sample(ts$timestamp,n_obs),
loco=sample(loco_list,n_obs,replace=TRUE),
mp=runif(n=n_obs,min=10,max=25),
speed=runif(n=n_obs,min=23,max=88),
section=as.character(sample(letters,n_obs,replace=TRUE))
)
#create a grid to use for filling in mising values
grid <- expand.grid(timestamp=ts$timestamp,loco=loco_list)
#join and interpolate/fill missing data
data_fill <- grid %>%
left_join(data,by=c("loco","timestamp")) %>%
group_by(loco) %>%
arrange(loco,timestamp) %>%
mutate(
section_fill=as.character(section),
mp_fill=as.character(mp),
speed_fill=na_interpolation(speed)
) %>%
fill(section_fill,.direction = "down") %>%
fill(mp_fill,.direction = "down")
#spark version--------------------------------------------------------------------------------------
sc <- spark_connect(master="local")
data_sp <- copy_to(sc,data)
ts_sp <- copy_to(sc,ts)
#very slow - large dataset, would prefer to create this is spark
grid_sp <- copy_to(sc,grid)
#preferable, but code is wrong
grid_sp <- spark_apply(
expand.grid(timestamp=ts_sp$timestamp,loco=unique(data_sp$loco))
)
#join and interpolate/fill missing data
data_fill_sp <- grid_sp %>%
left_join(data,by=c("loco","timestamp")) %>%
group_by(loco) %>%
arrange(loco,timestamp) %>%
mutate(
section_fill=as.character(section),
mp_fill=as.character(mp),
speed_fill=na_interpolation(speed)
) %>%
fill(section_fill,.direction = "down") %>%
fill(mp_fill,.direction = "down")
暂无答案!
目前还没有任何答案,快来回答吧!