scala Spark DataFrame:GroupBy After Order By是否保持该顺序?

whitzsjs  于 2022-11-09  发布在  Scala
关注(0)|答案(6)|浏览(308)

我有一台Spark 2.0 DataFrame example,其结构如下:

id, hour, count
id1, 0, 12
id1, 1, 55
..
id1, 23, 44
id2, 0, 12
id2, 1, 89
..
id2, 23, 34
etc.

它为每个id包含24个条目(一天中的每个小时一个条目),并使用orderBy函数按id、小时排序。
我已经创建了一个聚合器groupConcat

def groupConcat(separator: String, columnToConcat: Int) = new Aggregator[Row, String, String] with Serializable {
    override def zero: String = ""

    override def reduce(b: String, a: Row) = b + separator + a.get(columnToConcat)

    override def merge(b1: String, b2: String) = b1 + b2

    override def finish(b: String) = b.substring(1)

    override def bufferEncoder: Encoder[String] = Encoders.STRING

    override def outputEncoder: Encoder[String] = Encoders.STRING
  }.toColumn

它帮助我将列连接成字符串,以获得最终的 Dataframe :

id, hourly_count
id1, 12:55:..:44
id2, 12:89:..:34
etc.

我的问题是,如果我使用example.orderBy($"id",$"hour").groupBy("id").agg(groupConcat(":",2) as "hourly_count"),是否可以保证每小时的计数在各自的存储桶中正确排序?
我读到这不一定是RDDS的情况(参见Spark sort by key and then group by to get ordered iterable?),但也许DataFrames不同?
如果没有,我如何才能解决这个问题?

dsekswqp

dsekswqp1#

正如其他人指出的那样,groupBy之后的groupBy并不维持秩序。您想要做的是使用一个窗口函数,按id分区并按小时排序。您可以在此基础上使用collect_list,然后取结果列表中的最大值(最大),因为它们是累加的(即第一个小时只有自己在列表中,第二个小时在列表中有2个元素,依此类推)。
完整的示例代码:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._

val data = Seq(
    ( "id1", 0, 12),
    ("id1", 1, 55),
    ("id1", 23, 44),
    ("id2", 0, 12),
    ("id2", 1, 89),
    ("id2", 23, 34)
).toDF("id", "hour", "count")

val mergeList = udf{(strings: Seq[String]) => strings.mkString(":")}

data.withColumn(
    "collected",
    collect_list($"count").over(
        Window.partitionBy("id").orderBy("hour")
    )
)
.groupBy("id")
.agg(max($"collected").as("collected"))
.withColumn("hourly_count", mergeList($"collected"))
.select("id", "hourly_count")
.show

这让我们置身于DataFrame世界。我还简化了OP使用的UDF代码。
产出:

+---+------------+
| id|hourly_count|
+---+------------+
|id1|    12:55:44|
|id2|    12:89:34|
+---+------------+
cmssoen2

cmssoen22#

如果您想解决Java中的实现问题(Scala和Python应该类似):

example.orderBy("hour")
    .groupBy("id")
    .agg(functions.sort_array(
      functions.collect_list( 
        functions.struct(dataRow.col("hour"),
                         dataRow.col("count"))),false)
    .as("hourly_count"));
pgccezyw

pgccezyw3#

在我的案例中,秩序并不总是得到遵守:有时是这样,但大多数情况下不是。
我的数据框有200个分区在Spark 1.6上运行

df_group_sort = data.orderBy(times).groupBy(group_key).agg(
                                                  F.sort_array(F.collect_list(times)),
                                                  F.collect_list(times)
                                                           )

为了检查排序,我比较了

F.sort_array(F.collect_list(times))

F.collect_list(times)

例如(左:排序数组(Collect_List());右:Collect_List())

2016-12-19 08:20:27.172000 2016-12-19 09:57:03.764000
2016-12-19 08:20:30.163000 2016-12-19 09:57:06.763000
2016-12-19 08:20:33.158000 2016-12-19 09:57:09.763000
2016-12-19 08:20:36.158000 2016-12-19 09:57:12.763000
2016-12-19 08:22:27.090000 2016-12-19 09:57:18.762000
2016-12-19 08:22:30.089000 2016-12-19 09:57:33.766000
2016-12-19 08:22:57.088000 2016-12-19 09:57:39.811000
2016-12-19 08:23:03.085000 2016-12-19 09:57:45.770000
2016-12-19 08:23:06.086000 2016-12-19 09:57:57.809000
2016-12-19 08:23:12.085000 2016-12-19 09:59:56.333000
2016-12-19 08:23:15.086000 2016-12-19 10:00:11.329000
2016-12-19 08:23:18.087000 2016-12-19 10:00:14.331000
2016-12-19 08:23:21.085000 2016-12-19 10:00:17.329000
2016-12-19 08:23:24.085000 2016-12-19 10:00:20.326000

左列总是排序的,而右列只由排序的块组成。对于Take()的不同执行,右列中块的顺序是不同的。

w46czmvw

w46czmvw4#

顺序可能相同,也可能不同,具体取决于分区数量和数据分布。我们可以使用RDD本身来解决。
例如::
我将下面的样本数据保存在一个文件中,并将其加载到HDFS中。

1,type1,300
2,type1,100
3,type2,400
4,type2,500
5,type1,400
6,type3,560
7,type2,200
8,type3,800

并执行以下命令:

sc.textFile("/spark_test/test.txt").map(x=>x.split(",")).filter(x=>x.length==3).groupBy(_(1)).mapValues(x=>x.toList.sortBy(_(2)).map(_(0)).mkString("~")).collect()

输出:

Array[(String, String)] = Array((type3,6~8), (type1,2~1~5), (type2,7~3~4))

也就是说,我们按类型对数据进行分组,然后按价格排序,并使用“~”作为分隔符连接ID。上面的命令可以分解如下:

val validData=sc.textFile("/spark_test/test.txt").map(x=>x.split(",")).filter(x=>x.length==3)

val groupedData=validData.groupBy(_(1))  //group data rdds

val sortedJoinedData=groupedData.mapValues(x=>{
   val list=x.toList
   val sortedList=list.sortBy(_(2))
   val idOnlyList=sortedList.map(_(0))
   idOnlyList.mkString("~")
}
)
sortedJoinedData.collect()

然后,我们可以使用以下命令获取特定的组

sortedJoinedData.filter(_._1=="type1").collect()

输出:

Array[(String, String)] = Array((type1,2~1~5))
ogq8wdun

ogq8wdun5#

不,不一定要维护groupByKey中的排序,但在一个节点的内存中重现这是出了名的困难。正如前面所说的,最典型的方式是需要重新分区才能执行groupByKey。我设法通过在sort之后手动执行repartition来重现它。然后,我将结果传递给groupByKey

case class Numbered(num:Int, group:Int, otherData:Int)

// configure spark with "spark.sql.shuffle.partitions" = 2 or some other small number 

val v =
  (1 to 100000)
    // Make waaay more groups then partitions. I added an extra integer just to mess with the sort hash computation (i.e. so it won't be monotonic, not sure if needed)
    .map(Numbered(_, Random.nextInt(300), Random.nextInt(1000000))).toDS()
    // Be sure they are stored in a small number of partitions
    .repartition(2)
    .sort($"num")
    // Repartition again with a waaay bigger number then there are groups so that when things need to be merged you can get them out of order.
    .repartition(200)
    .groupByKey(_.group)
    .mapGroups {
      case (g, nums) =>
        nums             // all you need is .sortBy(_.num) here to fix the problem          
          .map(_.num)
          .mkString("~")
    }
    .collect()

// Walk through the concatenated strings. If any number ahead 
// is smaller than the number before it, you know that something
// is out of order.
v.zipWithIndex.map { case (r, i) =>
  r.split("~").map(_.toInt).foldLeft(0) { case (prev, next) =>
    if (next < prev) {
      println(s"***Next: ${next} less then ${prev} for dataset ${i + 1}***")
    }
    next
  }
}
vfh0ocws

vfh0ocws6#

简而言之,答案是肯定的,每小时的计数将保持相同的顺序。
总而言之,在分组之前进行排序是很重要的。此外,排序必须与组+您实际要对其排序的列相同。
下面是一个例子:

employees
    .sort("company_id", "department_id", "employee_role")
    .groupBy("company_id", "department_id")
    .agg(Aggregators.groupConcat(":", 2) as "count_per_role")

相关问题