scala 如何从DataFrame获取最后一行?

aelbi1ox  于 2022-11-09  发布在  Scala
关注(0)|答案(7)|浏览(310)

我有一个DataFrame,DataFrame有两列‘Value’和‘Timestamp’,‘timestmp’是有序的,我想获得DataFrame的最后一行,我应该怎么办?
这是我的意见:

+-----+---------+
|value|timestamp|
+-----+---------+
|    1|        1|
|    4|        2|
|    3|        3|
|    2|        4|
|    5|        5|
|    7|        6|
|    3|        7|
|    5|        8|
|    4|        9|
|   18|       10|
+-----+---------+

这是我的代码:

val arr = Array((1,1),(4,2),(3,3),(2,4),(5,5),(7,6),(3,7),(5,8),(4,9),(18,10))
    var df=m_sparkCtx.parallelize(arr).toDF("value","timestamp")

这是我预期的结果:

+-----+---------+
|value|timestamp|
+-----+---------+
|   18|       10|
+-----+---------+
kmbjn2e3

kmbjn2e31#

试试这个,它对我很管用。

df.orderBy($"value".desc).show(1)
nmpmafwu

nmpmafwu2#

我将简单地使用这样的查询-按降序对表进行排序-从该顺序中获取第一个值

df.createOrReplaceTempView("table_df")
query_latest_rec = """SELECT * FROM table_df ORDER BY value DESC limit 1"""
latest_rec = self.sqlContext.sql(query_latest_rec)
latest_rec.show()
uoifb46i

uoifb46i3#

我只需reduce

df.reduce { (x, y) => 
  if (x.getAs[Int]("timestamp") > y.getAs[Int]("timestamp")) x else y 
}
siotufzp

siotufzp4#

最有效的方法是reduce您的DataFrame。这为您提供了可以转换回DataFrame的单行,但由于它只包含1条记录,这没有多大意义。

sparkContext.parallelize(
  Seq(
  df.reduce {
    (a, b) => if (a.getAs[Int]("timestamp") > b.getAs[Int]("timestamp")) a else b 
   } match {case Row(value:Int,timestamp:Int) => (value,timestamp)}
  )
)
.toDF("value","timestamp")
.show

+-----+---------+
|value|timestamp|
+-----+---------+
|   18|       10|
+-----+---------+

效率较低(因为它需要重新洗牌),尽管此解决方案较短:

df
.where($"timestamp" === df.groupBy().agg(max($"timestamp")).map(_.getInt(0)).collect.head)
alen0pnh

alen0pnh5#

如果您的时间戳列是唯一的并且按递增顺序排列,则有以下方法可以获取最后一行

println(df.sort($"timestamp", $"timestamp".desc).first())

// Output [1,1]

df.sort($"timestamp", $"timestamp".desc).take(1).foreach(println)

// Output [1,1]

df.where($"timestamp" === df.count()).show

产出:

+-----+---------+
|value|timestamp|
+-----+---------+
|   18|       10|
+-----+---------+

如果不是,则使用该索引创建一个新列并选择最后一个索引,如下所示

val df1 = spark.sqlContext.createDataFrame(
    df.rdd.zipWithIndex.map {
  case (row, index) => Row.fromSeq(row.toSeq :+ index)
},
StructType(df.schema.fields :+ StructField("index", LongType, false)))

df1.where($"timestamp" === df.count()).drop("index").show

产出:

+-----+---------+
|value|timestamp|
+-----+---------+
|   18|       10|
+-----+---------+
imzjd6km

imzjd6km6#

Java:

Dataset<Row> sortDF = inputDF.orderBy(org.apache.spark.sql.functions.col(config.getIncrementingColumn()).desc());
Row row = sortDF.first()
gzjq41n4

gzjq41n47#

您还可以使用函数desc:Column desc(String columnName)

df.orderBy(desc("value")).show(1)

它产生的结果与

df.orderBy($"value".desc).show(1)

相关问题