将sparkDataframe转换为pandas/rDataframe的要求

lndjwyie  于 2021-05-30  发布在  Hadoop
关注(0)|答案(1)|浏览(465)

我正在hadoop的Yarn上运行spark。这种转换是如何工作的?collect()是否在转换之前发生?
我还需要在每个从属节点上安装python和r,以便转换工作?我很难找到这方面的文件。

eimct9ow

eimct9ow1#

toPandas (Pypark)/ as.data.frame (斯巴克)
必须在创建本地Dataframe之前收集数据。例如 toPandas 方法如下所示:

def toPandas(self):
    import pandas as pd
    return pd.DataFrame.from_records(self.collect(), columns=self.columns)

您需要在每个节点上安装python,最好是具有所有依赖项的python。
斯巴克对应物( as.data.frame )只是 collect .
总结这两种情况下的数据 collected 到驱动程序节点并转换为本地数据结构( pandas.DataFrame 以及 base::data.frame 分别在python和r中)。
矢量化用户定义函数
由于spark2.3.0,pyspark还提供了一组 pandas_udf ( SCALAR , GROUPED_MAP , GROUPED_AGG )对由
分区,以防 SCALAR 变体
在下列情况下的分组表达式 GROUPED_MAP 以及 GROUPED_AGG .
每个区块由
一个或多个 pandas.core.series.Series 万一 SCALAR 以及 GROUPED_AGG 变体。
单身汉 pandas.core.frame.DataFrame 万一 GROUPED_MAP 变体。
类似地,由于spark 2.0.0,sparkr提供了 dapply 以及 gapply 操作的函数 data.frames 分别由分区表达式和分组表达式定义。
上述功能:
不要向司机收费。除非数据只包含一个分区(即 coalesce(1) )或分组表达式是微不足道的(即。 groupBy(lit(1)) )不存在单节点瓶颈。
在相应执行器的内存中加载相应的块。因此,它受到每个执行器上可用的单个块/内存大小的限制。

相关问题