spark和sparksql:如何模拟窗口函数?

guykilcj  于 2021-06-02  发布在  Hadoop
关注(0)|答案(3)|浏览(496)

说明

给定一个Dataframe df ```
id | date

1 | 2015-09-01
2 | 2015-09-01
1 | 2015-09-03
1 | 2015-09-04
2 | 2015-09-04

我想创建一个运行计数器或索引,
按相同的id和
按组中的日期排序,
因此

id | date | counter

1 | 2015-09-01 | 1
1 | 2015-09-03 | 2
1 | 2015-09-04 | 3
2 | 2015-09-01 | 1
2 | 2015-09-04 | 2

这是我可以通过窗口功能实现的。

val w = Window.partitionBy("id").orderBy("date")
val resultDF = df.select( df("id"), rowNumber().over(w) )

不幸的是,spark 1.4.1不支持常规Dataframe的窗口函数:

org.apache.spark.sql.AnalysisException: Could not resolve window function 'row_number'. Note that, using window functions currently requires a HiveContext;


## 问题

如何在不使用窗口函数的情况下在CurrentSpark 1.4.1上实现上述计算?
spark何时支持常规Dataframe的窗口功能?
谢谢!
bcs8qyzn

bcs8qyzn1#

你可以用RDD做这个。就我个人而言,我发现RDD的api更有意义——我并不总是希望我的数据像Dataframe一样“扁平”。

val df = sqlContext.sql("select 1, '2015-09-01'"
    ).unionAll(sqlContext.sql("select 2, '2015-09-01'")
    ).unionAll(sqlContext.sql("select 1, '2015-09-03'")
    ).unionAll(sqlContext.sql("select 1, '2015-09-04'")
    ).unionAll(sqlContext.sql("select 2, '2015-09-04'"))

// dataframe as an RDD (of Row objects)
df.rdd 
  // grouping by the first column of the row
  .groupBy(r => r(0)) 
  // map each group - an Iterable[Row] - to a list and sort by the second column
  .map(g => g._2.toList.sortBy(row => row(1).toString))     
  .collect()

上述结果如下:

Array[List[org.apache.spark.sql.Row]] = 
Array(
  List([1,2015-09-01], [1,2015-09-03], [1,2015-09-04]), 
  List([2,2015-09-01], [2,2015-09-04]))

如果你想在“组”内的位置,你可以使用 zipWithIndex .

df.rdd.groupBy(r => r(0)).map(g => 
    g._2.toList.sortBy(row => row(1).toString).zipWithIndex).collect()

Array[List[(org.apache.spark.sql.Row, Int)]] = Array(
  List(([1,2015-09-01],0), ([1,2015-09-03],1), ([1,2015-09-04],2)),
  List(([2,2015-09-01],0), ([2,2015-09-04],1)))

你可以把它放回一个简单的列表/数组 Row 对象,但如果您需要在“组”上执行任何操作,这将不是一个好主意。
像这样使用rdd的缺点是,从Dataframe到rdd再到rdd的转换非常繁琐。

hiz5n14c

hiz5n14c2#

你可以用 HiveContext 对于本地 DataFrames 另外,除非你有很好的理由不这样做,否则这可能是个好主意。这是默认值 SQLContext 提供于 spark-shell 以及 pyspark 壳牌(就目前而言) sparkR 似乎用的是普通的 SQLContext )其解析器由sparksql和dataframe指南推荐。

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.rowNumber

object HiveContextTest {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Hive Context")
    val sc = new SparkContext(conf)
    val sqlContext = new HiveContext(sc)
    import sqlContext.implicits._

    val df = sc.parallelize(
        ("foo", 1) :: ("foo", 2) :: ("bar", 1) :: ("bar", 2) :: Nil
    ).toDF("k", "v")

    val w = Window.partitionBy($"k").orderBy($"v")
    df.select($"k", $"v", rowNumber.over(w).alias("rn")).show
  }
}
hs1ihplo

hs1ihplo3#

我完全同意,如果您有spark版本(>=)1.5,那么dataframes的窗口函数就是一个不错的选择。但是如果你真的被旧版本(例如1.4.1)困住了,这里有一个黑客的方法来解决这个问题

val df = sc.parallelize((1, "2015-09-01") :: (2, "2015-09-01") :: (1, "2015-09-03") :: (1, "2015-09-04") :: (1, "2015-09-04") :: Nil)
           .toDF("id", "date")

val dfDuplicate = df.selecExpr("id as idDup", "date as dateDup")
val dfWithCounter = df.join(dfDuplicate,$"id"===$"idDup")
                      .where($"date"<=$"dateDup")
                      .groupBy($"id", $"date")
                      .agg($"id", $"date", count($"idDup").as("counter"))
                      .select($"id",$"date",$"counter")

现在如果你这么做了 dfWithCounter.show 您将获得:

+---+----------+-------+                                                        
| id|      date|counter|
+---+----------+-------+
|  1|2015-09-01|      1|
|  1|2015-09-04|      3|
|  1|2015-09-03|      2|
|  2|2015-09-01|      1|
|  2|2015-09-04|      2|
+---+----------+-------+

请注意 date 不是排序,而是 counter 是正确的。您还可以更改 counter 通过改变 <=>=where 声明。

相关问题