如何使用分布式方法、IPython和Spark求出RDD
整数的中位数?RDD
大约有700,000个元素,因此太大而无法收集并求出中位数。
这个问题和这个问题很相似,但是,这个问题的答案是使用Scala,我不知道。
How can I calculate exact median with Apache Spark?
利用Scala答案的思路,我尝试用Python编写类似的答案。
我知道我首先要对RDD
进行排序。我不知道如何排序。我看到了sortBy
(根据给定的keyfunc
对RDD进行排序)和sortByKey
(对RDD
进行排序,假设它由(key,value)对组成。)方法。我认为这两个方法都使用key value,并且我的RDD
只有整数元素。
1.首先,我想做myrdd.sortBy(lambda x: x)
?
1.接下来,我将找到rdd的长度(rdd.count()
)。
1.最后,我想找到一个或两个元素在中心的rdd。我需要帮助这个方法太。
编辑:
我有一个想法。也许我可以索引我的RDD
,然后key = index和value = element。然后我可以尝试按值排序?我不知道这是否可行,因为只有一个sortByKey
方法。
8条答案
按热度按时间fcy6dtqo1#
正在进行的工作
SPARK-30569-* 添加调用percentage_approx的DSL函数 *
Spark2.0+:
您可以使用
approxQuantile
方法来实现Greenwald-Khanna algorithm:其中最后一个参数是相对误差。数值越小,结果越准确,计算成本越高。
从Spark 2.2(SPARK-14352)开始,它支持多列估计:
以及
底层方法也可用于使用
approx_percentile
函数的SQL聚合(全局和分组):Spark〈2.0
正如我在评论中提到的,这很可能不值得大惊小怪。如果数据相对较小,就像你的情况一样,那么只需在本地收集并计算中值:
在我几年前的电脑上大约需要0. 01秒,内存大约5. 5MB。
如果数据量很大,排序就会成为一个限制因素,所以与其得到一个精确的值,不如在本地采样、收集和计算,但是如果你真的想使用Spark,下面的代码应该可以做到(如果我没有搞砸任何事情的话):
还有一些测试:
最后让我们定义中位数:
到目前为止一切顺利,但在没有任何网络通信的本地模式下需要4. 66 s。可能有办法改进这一点,但为什么还要麻烦呢?
如果您使用
HiveContext
,您也可以使用配置单元UDF。具有连续值:
在
percentile_approx
中,您可以传递一个额外的参数来确定要使用的记录数。fae0ux8s2#
下面是我使用窗口函数的方法(使用pyspark 2.2.0)。
然后调用addMedian方法计算col2的中值:
最后,如果需要,可以按分组。
fnatzsnv3#
如果你只需要一个RDD方法而不想转移到DF,那么就添加一个解决方案。
如果你输入百分位数为50,你应该得到你所需要的中位数。让我知道,如果有任何角落的情况没有考虑。
w9apscun4#
有两种方法可以使用,一种是使用approxQuantile方法,另一种是使用percentile_approx方法,但是当记录数为偶数时,这两种方法都可能无法给予准确的结果。
dtcbnfnu5#
我写了一个函数,该函数将 Dataframe 作为输入,并返回一个 Dataframe ,该 Dataframe 将中值作为分区上的输出,order_col是我们要计算中值的列,part_col是我们要计算中值的级别:
bqujaahr6#
对于精确的中值计算,您可以使用以下函数并将其与PySpark DataFrame API配合使用:
像这样应用它:
km0tfn4u7#
我们可以使用以下代码计算spark中的中位数和分位数:
例如,在以下 Dataframe
[1,2,3,4,5]
中查找中值:误差越小,结果越准确。
des4xlb08#
从3.4+版本(以及3.3.1中已有的版本)开始,中值函数可直接访问https://github.com/apache/spark/blob/e170a2eb236a376b036730b5d63371e753f1d947/python/pyspark/sql/functions.py#L633
我猜如果这个版本最终发布的话,相应的文档会被添加进来。