scala 迭代Spark Dataframe 中的行和列

nhn9ugyo  于 2022-11-09  发布在  Scala
关注(0)|答案(9)|浏览(364)

我有以下动态创建的Spark Dataframe :

val sf1 = StructField("name", StringType, nullable = true)
val sf2 = StructField("sector", StringType, nullable = true)
val sf3 = StructField("age", IntegerType, nullable = true)

val fields = List(sf1,sf2,sf3)
val schema = StructType(fields)

val row1 = Row("Andy","aaa",20)
val row2 = Row("Berta","bbb",30)
val row3 = Row("Joe","ccc",40)

val data = Seq(row1,row2,row3)

val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)

df.createOrReplaceTempView("people")
val sqlDF = spark.sql("SELECT * FROM people")

现在,我需要迭代sqlDF中的每一行和每一列以打印每一列,这是我的尝试:

sqlDF.foreach { row =>
  row.foreach { col => println(col) }
}

rowRow类型,但不可迭代,这就是此代码在row.foreach中抛出编译错误的原因。如何迭代Row中的每一列?

dvtswwa3

dvtswwa31#

假设您有一个如下所示的Dataframe

+-----+------+---+
| name|sector|age|
+-----+------+---+
| Andy|   aaa| 20|
|Berta|   bbb| 30|
|  Joe|   ccc| 40|
+-----+------+---+

要循环您的Dataframe并从Dataframe提取元素,您可以选择以下方法之一。

方法1-使用Foreach的循环

无法使用foreach循环直接循环 Dataframe 。为此,首先必须使用case class定义DataFrame的模式,然后必须将该模式指定给DataFrame。

import spark.implicits._
import org.apache.spark.sql._
case class cls_Employee(name:String, sector:String, age:Int)
val df = Seq(cls_Employee("Andy","aaa", 20), cls_Employee("Berta","bbb", 30), cls_Employee("Joe","ccc", 40)).toDF()
df.as[cls_Employee].take(df.count.toInt).foreach(t => println(s"name=${t.name},sector=${t.sector},age=${t.age}"))

请看下面的结果:

使用RDD的方法2-循环

在您的Dataframe上使用rdd.collectrow变量将包含rdd行类型的Dataframe的每一行。要从一行中获取每个元素,请使用row.mkString(","),它将以逗号分隔值的形式包含每行的值。使用split函数(内置函数),可以使用索引访问rdd行的每个列值。

for (row <- df.rdd.collect)
{   
    var name = row.mkString(",").split(",")(0)
    var sector = row.mkString(",").split(",")(1)
    var age = row.mkString(",").split(",")(2)   
}

请注意,这种方法有两个缺点。
1.如果列值中有,,则数据会被错误拆分到相邻的列。
2.rdd.collect是一个action,它将所有数据返回到驱动程序的内存中,其中驱动程序的内存可能不是那么大,无法容纳数据,最终导致应用程序失败。
我建议使用方法1

方法3-使用WHERE和SELECT

您可以直接使用whereselect,它们将在内部循环并查找数据。由于它不应引发Index Out Out Bound异常,因此使用IF条件

if(df.where($"name" === "Andy").select(col("name")).collect().length >= 1)
    name = df.where($"name" === "Andy").select(col("name")).collect()(0).get(0).toString

方法4-使用临时表

您可以将dataframe注册为temptable,它将存储在Spark的内存中。然后,您可以像使用其他数据库一样使用SELECT查询来查询数据,然后收集并保存在变量中

df.registerTempTable("student")
name = sqlContext.sql("select name from student where name='Andy'").collect()(0).toString().replace("[","").replace("]","")
z9zf31ra

z9zf31ra2#

您可以使用toSeqRow转换为Seq。一旦转到Seq,就可以像往常一样使用foreachmap或任何需要的东西迭代它

sqlDF.foreach { row => 
           row.toSeq.foreach{col => println(col) }
    }

输出:

Berta
bbb
30
Joe
Andy
aaa
20
ccc
40
wztqucjr

wztqucjr3#

您应该在Row上使用mkString

sqlDF.foreach { row =>
  println(row.mkString(",")) 
}

但请注意,这将打印在Executors JVM的内部,因此通常看不到输出(除非您使用master=local)

hmae6n7t

hmae6n7t4#

sqlDF.foreach对我不起作用,但@Sarath Avanavu Answer中的方法1起作用了,但它有时也会玩弄唱片的顺序。
我又找到了一种有效的方法

df.collect().foreach { row =>
   println(row.mkString(","))
}
huwehgph

huwehgph5#

您应该遍历分区,这允许Spark并行处理数据,并且可以在分区内的每一行上执行Foreach。
如果需要,您可以将分区中的数据进一步分组为批

sqlDF.foreachPartition { partitionedRows: Iterator[Model1] =>     
  if (partitionedRows.take(1).nonEmpty) {
       partitionedRows.grouped(numberOfRowsPerBatch).foreach { batch =>
        batch.foreach { row => 
        .....
xe55xuns

xe55xuns6#

这对我来说很好

sqlDF.collect().foreach(row => row.toSeq.foreach(col => println(col)))
kx5bkwkv

kx5bkwkv7#

简单地收集结果,然后应用Foreach
df.collect().foreach(println)

sxpgvts3

sxpgvts38#

我的解决方案使用for,因为它是我需要的:
解决方案1:

case class campos_tablas(name:String, sector:String, age:Int)

for (row <- df.as[campos_tablas].take(df.count.toInt)) 
{ 
     print(row.name.toString)

}

解决方案2:

for (row <- df.take(df.count.toInt))
{ 
   print(row(0).toString)
}
fnx2tebb

fnx2tebb9#

假设ResultDF为Dataframe。

val resultDF = // DataFrame //
var itr = 0
val resultRow = resultDF.count
val resultSet = resultDF.collectAsList
var load_id = 0
var load_dt = ""
var load_hr = 0

while ( itr < resultRow ){
    col1 = resultSet.get(itr).getInt(0)
    col2 = resultSet.get(itr).getString(1) // if column is having String value
    col3 = resultSet.get(itr).getLong(2) // if column is having Long value

    // Write other logic for your code //

    itr = itr + 1
}

相关问题