我有以下动态创建的Spark Dataframe :
val sf1 = StructField("name", StringType, nullable = true)
val sf2 = StructField("sector", StringType, nullable = true)
val sf3 = StructField("age", IntegerType, nullable = true)
val fields = List(sf1,sf2,sf3)
val schema = StructType(fields)
val row1 = Row("Andy","aaa",20)
val row2 = Row("Berta","bbb",30)
val row3 = Row("Joe","ccc",40)
val data = Seq(row1,row2,row3)
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")
现在,我需要迭代sqlDF
中的每一行和每一列以打印每一列,这是我的尝试:
sqlDF.foreach { row =>
row.foreach { col => println(col) }
}
row
是Row
类型,但不可迭代,这就是此代码在row.foreach
中抛出编译错误的原因。如何迭代Row
中的每一列?
9条答案
按热度按时间dvtswwa31#
假设您有一个如下所示的
Dataframe
要循环您的Dataframe并从Dataframe提取元素,您可以选择以下方法之一。
方法1-使用Foreach的循环
无法使用
foreach
循环直接循环 Dataframe 。为此,首先必须使用case class
定义DataFrame的模式,然后必须将该模式指定给DataFrame。请看下面的结果:
使用RDD的方法2-循环
在您的Dataframe上使用
rdd.collect
。row
变量将包含rdd
行类型的Dataframe的每一行。要从一行中获取每个元素,请使用row.mkString(",")
,它将以逗号分隔值的形式包含每行的值。使用split
函数(内置函数),可以使用索引访问rdd
行的每个列值。请注意,这种方法有两个缺点。
1.如果列值中有
,
,则数据会被错误拆分到相邻的列。2.
rdd.collect
是一个action
,它将所有数据返回到驱动程序的内存中,其中驱动程序的内存可能不是那么大,无法容纳数据,最终导致应用程序失败。我建议使用方法1。
方法3-使用WHERE和SELECT
您可以直接使用
where
和select
,它们将在内部循环并查找数据。由于它不应引发Index Out Out Bound异常,因此使用IF条件方法4-使用临时表
您可以将dataframe注册为temptable,它将存储在Spark的内存中。然后,您可以像使用其他数据库一样使用SELECT查询来查询数据,然后收集并保存在变量中
z9zf31ra2#
您可以使用
toSeq
将Row
转换为Seq
。一旦转到Seq
,就可以像往常一样使用foreach
、map
或任何需要的东西迭代它输出:
wztqucjr3#
您应该在
Row
上使用mkString
:但请注意,这将打印在Executors JVM的内部,因此通常看不到输出(除非您使用master=local)
hmae6n7t4#
sqlDF.foreach
对我不起作用,但@Sarath Avanavu Answer中的方法1起作用了,但它有时也会玩弄唱片的顺序。我又找到了一种有效的方法
huwehgph5#
您应该遍历分区,这允许Spark并行处理数据,并且可以在分区内的每一行上执行Foreach。
如果需要,您可以将分区中的数据进一步分组为批
xe55xuns6#
这对我来说很好
kx5bkwkv7#
简单地收集结果,然后应用Foreach
df.collect().foreach(println)
sxpgvts38#
我的解决方案使用for,因为它是我需要的:
解决方案1:
解决方案2:
fnx2tebb9#
假设ResultDF为Dataframe。