sparkr:从数字列生成密度数据

c9x0cxw0  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(408)

我想知道如何生成一个数据对象,就像您在调用 stats::density(df$variable) 在sparkDataframe的数字列上?
我正在调查 SparkR::spark.lapply 但我想我错过了什么。我在下面创建了一个小示例。如果有人知道如何帮助我,我会非常感激。
最好的,nf
例子:

df<- iris
gen_density_data<- function(df){
  col_types<- sapply(df, class)
  good_cols<- which(col_types %in% c("numeric", "integer"))
  tres<- lapply(good_cols, function(x){
    expr<- paste0("stats::density(df$", colnames(df)[x], ")")
    eval(parse(text=expr))
  })
  return(tres)
}

res<- gen_density_data(df)

# And for Spark:

sdf<- SparkR::createDataFrame(iris)
gen_spark_density_data<- function(sdf){
  tmp_types<- SparkR::coltypes(sdf)
  good_cols_idx<- which(tmp_types %in% setdiff(tmp_types, c("character", "POSIXct", "POSIXlt", "logical")))
  if(length(good_cols_idx)>=1){
    tres<- SparkR::spark.lapply(good_cols_idx, function(x){
      eval(parse(text=paste0("stats::density(sdf$", colnames(sdf)[x], ")")))
    })
    return(tres)
  }
}

tst<- gen_spark_density_data(sdf=sdf)    # This is where it throws errors.
btxsgosb

btxsgosb1#

我想出了一个很好的解决办法。我用 highcharter 用于绘图。我想我可以进一步改进管理数据分区的方式。目前,对于具有在最小值和最大值之间存在较大差异的列的大型数据集,这可能不是最具可伸缩性的解决方案。一些条件检查可能是正确的,但是为了得到一个例子,这是我做的。注:我采用了https://rpubs.com/mcocam12/kdf_byhand. 非常感谢马克的例子。
数据:

df<- do.call("rbind", replicate(10, iris, simplify = FALSE))
sdf<- SparkR::createDataFrame(df)
sdf<- SparkR::repartition(sdf, nrow(sdf))

功能:

gen_sdf_kernel_density_points<- function(sdf=sdf,num_values, h=1){
  x_col<- SparkR::colnames(sdf)[1]
  min_max_sdf<- eval(parse(text=paste0("SparkR::agg(sdf, min=min(sdf$", x_col, "), max=max(sdf$", x_col,")) %>% SparkR::collect()")))
  Range = seq(min_max_sdf$min-5, min_max_sdf$max+5, 0.01)
  Range<- data.frame(Range)
  RangeSDF<- SparkR::createDataFrame(Range)

  # this is where I think I could be better with partitions, ideas welcomed
  #RangeSDF<- SparkR::repartition(RangeSDF, nrow(RangeSDF))
  # if(nrow(Range)>1000){
  #   RangeSDF<- SparkR::repartition(RangeSDF, 200L)
  # } else if(nrow(Range) > 64){
  #   RangeSDF<- SparkR::repartition(RangeSDF, 64L)
  # }

  tst<- SparkR::crossJoin(sdf, RangeSDF)
  tst$density<- eval(parse(text=paste0("exp(-(tst$Range-tst$", x_col,")^2/(2*h^2))/(h*sqrt(2*pi))")))

  ## Now group by range and get the sum of the density, normalize by the number of values
  gb_df<- SparkR::groupBy(tst, tst$Range)
  densities2<- SparkR::agg(gb_df, bell_sum=sum(tst$density))
  densities2<- SparkR::withColumn(densities2, "kernel_density", densities2$bell_sum / num_values)
  densities2<- SparkR::arrange(densities2, asc(densities2$Range))
  return(densities2)
}

gen_den_plots_from_spark_res<- function(res){
  big_out<- lapply(seq_along(res), function(x){
    var_name<- names(res)[x]
    rdf<- res[[x]]
    tmp<- data.frame(cbind(x = rdf$Range, y = rdf$kernel_density))
    x<- highcharter::list_parse(tmp)

    hc<- highcharter::highchart() %>%
      hc_series(
        list(
          name="Density Estimate",
          data =  x,
          type = "areaspline",
          marker = list(enabled = FALSE),
          color =  list(
            linearGradient = list(x1 = 0, y1 = 1, x2 = 0, y2 = 0),
            stops = list(
              list(0, "transparent"),
              list(0.33, "#0000FF1A"),
              list(0.66, "#0000FF33"),
              list(1, "#ccc")
            )
          ),
          fillColor = list(
            linearGradient = list(x1 = 0, y1 = 1, x2 = 0, y2 = 0),
            stops = list(
              list(0, "transparent"),
              list(0.1, "#0000FF1A"),
              list(0.5, "#0000FF33"),
              list(1, "#0000FF80")
            )
          )
        )
      )
    hc<- hc  %>%
      highcharter::hc_title(text=paste0("Density Plot For: ", snakecase::to_title_case(var_name)))# %>%    hc_add_series(data =tmp, hcaes(x= tmp$x, y = tmp$y),name="Bars", type="column" )

    return(hc)
  })
  return(big_out)
}

make_hc_grid<- function(tres_out, ncol=2){
  hc<- tres_out %>%
    highcharter::hw_grid(rowheight = 450, ncol = ncol)  %>%htmltools::browsable()
  hc
}

用法:

tmp_types<- SparkR::coltypes(sdf)
good_cols_idx<- which(tmp_types %in% setdiff(tmp_types, c("character", "POSIXct", "POSIXlt", "logical")))
nrows_sdf<- SparkR::count(sdf)

if(length(good_cols_idx)>=1){ 
  out<- lapply(seq_along(good_cols_idx), function(z){
    # Need to select a single column for the sdf, otherwise the cross join becomes too big
    tmpz<- SparkR::select(sdf, SparkR::colnames(sdf)[good_cols_idx[z]])
    out<- gen_sdf_kernel_density_points(sdf = tmpz, num_values = nrows_sdf)
    out<- SparkR::collect(out)
    return(out)
  }) %>% stats::setNames(SparkR::colnames(sdf)[good_cols_idx])
}

绘图:

tres<- gen_den_plots_from_spark_res(res=out)
all_plots<- make_hc_grid(tres_out = tres)

# View Result

all_plots

预期结果:

如果你有想法,我很乐意听听。
最好的,nf

相关问题