如何在spak/scala中不使用collect方法从Dataframe读取数据

zbdgwd5y  于 2021-06-01  发布在  Hadoop
关注(0)|答案(2)|浏览(796)

我有一个包含200万条记录的Dataframe。我想看每一张记录作分析。
但当我使用 dataframe.collect() 方法,将驱动程序运行的所有节点的数据带到本地,这将影响并行性的实现。有什么解决办法吗?
我的配置是:

Cloudera:CDH 5.9.1
Cluster Nodes:5 ->each 8GB RAM
Spark:1.6
Scala:10.5
tpgth1q7

tpgth1q71#

.collect() 是一个动作,如您所说,它会将结果作为本地数据集返回给您的驱动程序 Row . 如果结果数据集的大小相对于您的系统配置来说是巨大的,那么这可能是一个瓶颈。
也就是说,你的问题缺乏这样一种感觉:你没有提到你想用你读到的数据做什么。如果只是将数据作为一个Dataframe来读取,那么您可以执行类似spark 1.x.x的操作

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext

val conf = new SparkConf().setAppName("test").setMaster("local[2]")
val sc = new SparkContext(conf)

val sqlContext = new SQLContext(sc)  

import sqlContext.implicits._

val df = sqlContext.read.csv("file:///path/to/input/")

使用 df 以引用Dataframe。
你需要对这个Dataframe做些什么。例如,而不是 collect() 您可以将其保存为csv,路径如下,

df.write.csv("file:///path/to/output")

这将工作没有这个司机头顶你刚才面对。如果这有帮助,请告诉我。

aij0ehis

aij0ehis2#

如果只需要读取数据,则必须执行某种操作: collect 是通常的选择,当你需要驱动程序有权访问它。但是,如果您需要将其存储在其他地方,则可以利用可用于hdfs、jdbc等的并行编写器。
如果您需要访问该数据以计算进一步的结果,您可以将数据保留在原来的位置,并使用常用的组合器对其应用函数( map , flatMap , filter 等等)。
但是如果您需要将结果保存在本地,则除了收集之外别无选择。当然,这会影响到计算的并行性,但是你必须得到你想要的输出:本地存储,本地操作;分布式存储,分布式操作。

相关问题