我有一台Spark 2.0 DataFrame example
,其结构如下:
id, hour, count
id1, 0, 12
id1, 1, 55
..
id1, 23, 44
id2, 0, 12
id2, 1, 89
..
id2, 23, 34
etc.
它为每个id包含24个条目(一天中的每个小时一个条目),并使用orderBy函数按id、小时排序。
我已经创建了一个聚合器groupConcat
:
def groupConcat(separator: String, columnToConcat: Int) = new Aggregator[Row, String, String] with Serializable {
override def zero: String = ""
override def reduce(b: String, a: Row) = b + separator + a.get(columnToConcat)
override def merge(b1: String, b2: String) = b1 + b2
override def finish(b: String) = b.substring(1)
override def bufferEncoder: Encoder[String] = Encoders.STRING
override def outputEncoder: Encoder[String] = Encoders.STRING
}.toColumn
它帮助我将列连接成字符串,以获得最终的 Dataframe :
id, hourly_count
id1, 12:55:..:44
id2, 12:89:..:34
etc.
我的问题是,如果我使用example.orderBy($"id",$"hour").groupBy("id").agg(groupConcat(":",2) as "hourly_count")
,是否可以保证每小时的计数在各自的存储桶中正确排序?
我读到这不一定是RDDS的情况(参见Spark sort by key and then group by to get ordered iterable?),但也许DataFrames不同?
如果没有,我如何才能解决这个问题?
6条答案
按热度按时间dsekswqp1#
正如其他人指出的那样,
groupBy
之后的groupBy
并不维持秩序。您想要做的是使用一个窗口函数,按id分区并按小时排序。您可以在此基础上使用collect_list
,然后取结果列表中的最大值(最大),因为它们是累加的(即第一个小时只有自己在列表中,第二个小时在列表中有2个元素,依此类推)。完整的示例代码:
这让我们置身于DataFrame世界。我还简化了OP使用的UDF代码。
产出:
cmssoen22#
如果您想解决Java中的实现问题(Scala和Python应该类似):
pgccezyw3#
在我的案例中,秩序并不总是得到遵守:有时是这样,但大多数情况下不是。
我的数据框有200个分区在Spark 1.6上运行
为了检查排序,我比较了
和
例如(左:排序数组(Collect_List());右:Collect_List())
左列总是排序的,而右列只由排序的块组成。对于Take()的不同执行,右列中块的顺序是不同的。
w46czmvw4#
顺序可能相同,也可能不同,具体取决于分区数量和数据分布。我们可以使用RDD本身来解决。
例如::
我将下面的样本数据保存在一个文件中,并将其加载到HDFS中。
并执行以下命令:
输出:
也就是说,我们按类型对数据进行分组,然后按价格排序,并使用“~”作为分隔符连接ID。上面的命令可以分解如下:
然后,我们可以使用以下命令获取特定的组
输出:
ogq8wdun5#
不,不一定要维护
groupByKey
中的排序,但在一个节点的内存中重现这是出了名的困难。正如前面所说的,最典型的方式是需要重新分区才能执行groupByKey
。我设法通过在sort
之后手动执行repartition
来重现它。然后,我将结果传递给groupByKey
。vfh0ocws6#
简而言之,答案是肯定的,每小时的计数将保持相同的顺序。
总而言之,在分组之前进行排序是很重要的。此外,排序必须与组+您实际要对其排序的列相同。
下面是一个例子: