我有一个包含200万条记录的Dataframe。我想看每一张记录作分析。
但当我使用 dataframe.collect()
方法,将驱动程序运行的所有节点的数据带到本地,这将影响并行性的实现。有什么解决办法吗?
我的配置是:
Cloudera:CDH 5.9.1
Cluster Nodes:5 ->each 8GB RAM
Spark:1.6
Scala:10.5
我有一个包含200万条记录的Dataframe。我想看每一张记录作分析。
但当我使用 dataframe.collect()
方法,将驱动程序运行的所有节点的数据带到本地,这将影响并行性的实现。有什么解决办法吗?
我的配置是:
Cloudera:CDH 5.9.1
Cluster Nodes:5 ->each 8GB RAM
Spark:1.6
Scala:10.5
2条答案
按热度按时间tpgth1q71#
.collect()
是一个动作,如您所说,它会将结果作为本地数据集返回给您的驱动程序Row
. 如果结果数据集的大小相对于您的系统配置来说是巨大的,那么这可能是一个瓶颈。也就是说,你的问题缺乏这样一种感觉:你没有提到你想用你读到的数据做什么。如果只是将数据作为一个Dataframe来读取,那么您可以执行类似spark 1.x.x的操作
使用
df
以引用Dataframe。你需要对这个Dataframe做些什么。例如,而不是
collect()
您可以将其保存为csv,路径如下,这将工作没有这个司机头顶你刚才面对。如果这有帮助,请告诉我。
aij0ehis2#
如果只需要读取数据,则必须执行某种操作:
collect
是通常的选择,当你需要驱动程序有权访问它。但是,如果您需要将其存储在其他地方,则可以利用可用于hdfs、jdbc等的并行编写器。如果您需要访问该数据以计算进一步的结果,您可以将数据保留在原来的位置,并使用常用的组合器对其应用函数(
map
,flatMap
,filter
等等)。但是如果您需要将结果保存在本地,则除了收集之外别无选择。当然,这会影响到计算的并行性,但是你必须得到你想要的输出:本地存储,本地操作;分布式存储,分布式操作。