有没有加速随机样本和连接的例子?

xnifntxz  于 2021-07-09  发布在  Spark
关注(0)|答案(0)|浏览(339)

我正在寻求帮助,试图了解如何使 sparklyr 在对建模数据集进行采样的日常工作中,工作更智能、速度更快。
在hive中使用结构化数据,我启用了相同的执行器和ram配置,这样我就可以在不到15分钟的时间内完成pyspark中的相同流程。。。但与 sparklyr ,大约需要90分钟。
这个过程是我需要从一个巨大的数据集采样到一个成千上万的数据集。。。假设我选择bernoulli目标样本的大小为10000个true和20000个false,暂时忽略这些表的bucketing或分区。
使用SparkyR,从hive查询,我就是这样完成的:

  1. referenceData<-"datamart.archivedFile" # About 200 million rows and 1000 columns
  2. nonTargetCriteria<-"Score>0 AND MonthsSinceActivity>12" # Drops to about 80% of the file
  3. targetCriteria<-"MonthsSinceActivity>0 AND MonthsSinceActivity<4" # Drops to about 1% of the file
  4. WHERE.NT<-paste("WHERE ",nonTargetCriteria)
  5. WHERE.T<-paste("WHERE ",targetCriteria)
  6. nT<- 10000 # sample size of target behavior (let's say response is coded as target=1)
  7. nNT<- 20000 # sample size of non-targets (let's say non-response is coded as target=0)
  8. join_vars <- c('id','address')
  9. names(join_vars) <- c('id','address')
  10. ## First I count my responders available
  11. n.targetSDF <- hive_context(sc) %>% invoke('sql', paste("select count(1) AS N, ",paste("SUM(CASE WHEN ",targetCriteria," THEN 1 ELSE 0 END) nTar"),
  12. " from ",targetData)) %>% sdf_collect()
  13. ## Then I count the full available population in the reference data set subject to criteria
  14. n.referenceSDF <- hive_context(sc) %>% invoke('sql', paste("select count(1) AS N, ",paste("SUM(CASE WHEN ",nonTargetCriteria," THEN 1 ELSE 0 END) nNonTar"),
  15. " from ",referenceData)) %>% sdf_collect()
  16. # Now I pull the target=1 group and join them to the earlier archived data
  17. targetSDF.S <- hive_context(sc) %>% invoke('sql', paste("select ",paste0(join_vars,collapse=",")," from ",targetData,WHERE.T)) %>%
  18. sdf_sample(fraction = 2*nT/n.targetSDF[2]) ## Bring in the count from above to define the fraction.
  19. join.S <- tbl(sc,referenceData) %>% inner_join(targetSDF.S,by=join_vars) %>% mutate(target=1)
  20. # Now I initialize the response=0 data
  21. referenceSDF.S <- hive_context(sc) %>% invoke('sql', paste("select *, 0 target from ",referenceData,WHERE.NT))
  22. # Bring in the count from above to define the fraction.
  23. N.NT<-n.referenceSDF[2]
  24. # Union the two samples and collect into R.
  25. df <- sdf_bind_rows(join.S,referenceSDF.S %>% sdf_sample(fraction = 1.2*nNT/N.NT)) %>% sdf_collect()

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题