我正在hadoop的Yarn上运行spark。这种转换是如何工作的?collect()是否在转换之前发生?我还需要在每个从属节点上安装python和r,以便转换工作?我很难找到这方面的文件。
eimct9ow1#
toPandas (Pypark)/ as.data.frame (斯巴克)必须在创建本地Dataframe之前收集数据。例如 toPandas 方法如下所示:
toPandas
as.data.frame
def toPandas(self): import pandas as pd return pd.DataFrame.from_records(self.collect(), columns=self.columns)
def toPandas(self):
import pandas as pd
return pd.DataFrame.from_records(self.collect(), columns=self.columns)
您需要在每个节点上安装python,最好是具有所有依赖项的python。斯巴克对应物( as.data.frame )只是 collect .总结这两种情况下的数据 collected 到驱动程序节点并转换为本地数据结构( pandas.DataFrame 以及 base::data.frame 分别在python和r中)。矢量化用户定义函数由于spark2.3.0,pyspark还提供了一组 pandas_udf ( SCALAR , GROUPED_MAP , GROUPED_AGG )对由分区,以防 SCALAR 变体在下列情况下的分组表达式 GROUPED_MAP 以及 GROUPED_AGG .每个区块由一个或多个 pandas.core.series.Series 万一 SCALAR 以及 GROUPED_AGG 变体。单身汉 pandas.core.frame.DataFrame 万一 GROUPED_MAP 变体。类似地,由于spark 2.0.0,sparkr提供了 dapply 以及 gapply 操作的函数 data.frames 分别由分区表达式和分组表达式定义。上述功能:不要向司机收费。除非数据只包含一个分区(即 coalesce(1) )或分组表达式是微不足道的(即。 groupBy(lit(1)) )不存在单节点瓶颈。在相应执行器的内存中加载相应的块。因此,它受到每个执行器上可用的单个块/内存大小的限制。
collect
collected
pandas.DataFrame
base::data.frame
pandas_udf
SCALAR
GROUPED_MAP
GROUPED_AGG
pandas.core.series.Series
pandas.core.frame.DataFrame
dapply
gapply
data.frames
coalesce(1)
groupBy(lit(1))
1条答案
按热度按时间eimct9ow1#
toPandas
(Pypark)/as.data.frame
(斯巴克)必须在创建本地Dataframe之前收集数据。例如
toPandas
方法如下所示:您需要在每个节点上安装python,最好是具有所有依赖项的python。
斯巴克对应物(
as.data.frame
)只是collect
.总结这两种情况下的数据
collected
到驱动程序节点并转换为本地数据结构(pandas.DataFrame
以及base::data.frame
分别在python和r中)。矢量化用户定义函数
由于spark2.3.0,pyspark还提供了一组
pandas_udf
(SCALAR
,GROUPED_MAP
,GROUPED_AGG
)对由分区,以防
SCALAR
变体在下列情况下的分组表达式
GROUPED_MAP
以及GROUPED_AGG
.每个区块由
一个或多个
pandas.core.series.Series
万一SCALAR
以及GROUPED_AGG
变体。单身汉
pandas.core.frame.DataFrame
万一GROUPED_MAP
变体。类似地,由于spark 2.0.0,sparkr提供了
dapply
以及gapply
操作的函数data.frames
分别由分区表达式和分组表达式定义。上述功能:
不要向司机收费。除非数据只包含一个分区(即
coalesce(1)
)或分组表达式是微不足道的(即。groupBy(lit(1))
)不存在单节点瓶颈。在相应执行器的内存中加载相应的块。因此,它受到每个执行器上可用的单个块/内存大小的限制。