如何使用sparkr的as.dataframe()将大型r data.frames加载到spark中?

yjghlzjz  于 2021-06-02  发布在  Hadoop
关注(0)|答案(2)|浏览(318)

我的目标是将大型r data.frame加载到spark中。Dataframe的大小是5密耳。行和7列的各种类型。一旦加载到r中,这个Dataframe将占用大约200mb的内存。但是,当我尝试使用 as.DataFrame() 函数,r会话被永远占用,它已经运行了1个小时,我不得不取消操作。
详情如下:
我正在创建以下数据集,以便在此示例中使用:

n=5e6 # set sample size

d <- data.frame(
    v1=base::sample(1:9,n,replace=TRUE), 
    v2=base::sample(1000:9000,n,replace=TRUE), 
    v3=seq(as.Date("2016-01-01"), as.Date("2016-12-31"), by = "day")[base::sample(1:365,n,replace=TRUE)],
    v4=LETTERS[base::sample(1:length(LETTERS),n,replace=TRUE)], 
    v5=base::sample(1000:9000,n,replace=TRUE)/1000, 
    v6=seq(ISOdate(2016,1,1), ISOdate(2018,1,1), "sec")[base::sample(1:63158401,n,replace=TRUE)],
    v7=c(TRUE,FALSE)[base::sample(1:2,n,replace=TRUE)]
)

上面创建了一个示例data.frame
大小,约200mb:

paste0("size: ", round(as.numeric(object.size(d))/1000000,1)," mb")

接下来,我创建一个spark会话:

Sys.setenv(SPARK_HOME='C:\\soft\\spark-2.0.0-bin-hadoop2.7',HADOOP_HOME='C:\\soft\\hadoop')
.libPaths(c(file.path(Sys.getenv('SPARK_HOME'), 'R', 'lib'),.libPaths()))
Sys.setenv('SPARKR_SUBMIT_ARGS'='"sparkr-shell"')

library(SparkR)
library(rJava)
sparkR.session(enableHiveSupport = FALSE,master = "local[*]", sparkConfig = list(spark.driver.memory = "1g",spark.sql.warehouse.dir="C:\\soft\\hadoop\\bin"))

现在,我正在尝试将上面创建的data.frame加载到spark中:

d_sd <- as.DataFrame(d)

上面的命令需要很长时间才能运行。
我做错什么了吗?它是否与原始r data.frame中列的class()有关?我是否应该采取另一种方法将大型数据集从r加载到spark中?如果是,请随时提出建议。
先谢谢你。
附言:
我能够使用此方法快速转换和操作spark中的小数据集。
以下是我运行的r会话和操作系统的一些背景信息:
r版本3.2.5(2016-04-14)平台:x86\u 64-w64-mingw32/x64(64位),运行环境:windows 7 x64(内部版本7601)service pack 1
我正在Windows7 professional(64位)下运行microsoft版本的r(revolution),内存为8GB。处理器:i5-2520m@2.50ghz
编辑2016-09-19:
谢谢你,zeydy ortiz和mohit bansal。根据你的回答,我试过以下方法,但我仍然面临同样的问题:

Sys.setenv(SPARK_HOME='C:\\soft\\spark-2.0.0-bin-hadoop2.7',HADOOP_HOME='C:\\soft\\hadoop')
.libPaths(c(file.path(Sys.getenv('SPARK_HOME'), 'R', 'lib'),.libPaths()))
Sys.setenv('SPARKR_SUBMIT_ARGS'='"sparkr-shell"')

library(SparkR)
library(rJava)
sparkR.session(enableHiveSupport = FALSE,master = "local[*]", sparkConfig = list(spark.driver.memory = "1g",spark.sql.warehouse.dir="C:\\soft\\hadoop\\bin"))

n=5e6 # set sample size

d_sd <- createDataFrame(sqlContext,data=data.frame(
        v1=base::sample(1:9,n,replace=TRUE), 
        v2=base::sample(1000:9000,n,replace=TRUE), 
        v3=seq(as.Date("2016-01-01"), as.Date("2016-12-31"), by = "day")[base::sample(1:365,n,replace=TRUE)],
        v4=LETTERS[base::sample(1:length(LETTERS),n,replace=TRUE)], 
        v5=base::sample(1000:9000,n,replace=TRUE)/1000, 
        v6=seq(ISOdate(2016,1,1), ISOdate(2018,1,1), "sec")[base::sample(1:63158401,n,replace=TRUE)],
        v7=c(TRUE,FALSE)[base::sample(1:2,n,replace=TRUE)]
    ))

将r df转换为spark df的命令运行了几个小时。不得不取消。请帮忙。
编辑2016-12-14:
使用spark 1.6.1和r 3.2.0尝试上述操作。我最近使用spark2.0.2(最新版本)和r3.2.5尝试了这个方法,但遇到了相同的问题。
任何帮助都将不胜感激。

6ju8rftf

6ju8rftf1#

这与内存限制有关,为什么必须首先创建基本Dataframe并将其转换为sparkDataframe?
您可以将这两个步骤合并为一个步骤并获得结果:

Sys.setenv(SPARK_HOME='C:\\soft\\spark-2.0.0-bin-hadoop2.7',HADOOP_HOME='C:\\soft\\hadoop')
.libPaths(c(file.path(Sys.getenv('SPARK_HOME'), 'R', 'lib'),.libPaths()))
Sys.setenv('SPARKR_SUBMIT_ARGS'='"sparkr-shell"')

library(SparkR)
library(rJava)
sparkR.session(enableHiveSupport = FALSE,master = "local[*]", sparkConfig = list(spark.driver.memory = "1g",spark.sql.warehouse.dir="C:\\soft\\hadoop\\bin"))

然后可以加载sdf:

n=5e6 # set sample size

d_sd <- as.DataFrame(data.frame(
    v1=base::sample(1:9,n,replace=TRUE), 
    v2=base::sample(1000:9000,n,replace=TRUE), 
    v3=seq(as.Date("2016-01-01"), as.Date("2016-12-31"), by = "day")[base::sample(1:365,n,replace=TRUE)],
    v4=LETTERS[base::sample(1:length(LETTERS),n,replace=TRUE)], 
    v5=base::sample(1000:9000,n,replace=TRUE)/1000, 
    v6=seq(ISOdate(2016,1,1), ISOdate(2018,1,1), "sec")[base::sample(1:63158401,n,replace=TRUE)],
    v7=c(TRUE,FALSE)[base::sample(1:2,n,replace=TRUE)]
))

您还可以参考类似的问题:如何最好地处理将大型本地Dataframe转换为sparkrDataframe的问题?

t0ybt7op

t0ybt7op2#

在spark 2.0.0中,使用 createDataFrame(d)

相关问题