我读到过归约函数必须是交换性和结合性的。我应该如何编写一个函数来求平均值,使其符合这一要求?如果我应用以下函数来计算RDD的平均值,它将不会正确地计算平均值。有人能解释一下我的功能出了什么问题吗?我猜它需要两个元素,比如1,2,然后对它们应用函数,比如(1+2)/2。然后将结果与下一个元素3相加,并除以2,依此类推。
(1+2)/2
val rdd = sc.parallelize(1 to 100) rdd.reduce((_ + _) / 2)
bihw5rsg1#
Rdd.create((+)/2)上述用于计算平均值的reduce方法有几个问题:1.placeholder语法不能作为reduce((acc, x) => (acc + x) / 2)的缩写1.由于您的RDD类型为整型,rdd.reduce((acc, x) => (acc + x) / 2)将在每次迭代中生成integer division(计算平均值肯定是错误的)1.reduce方法不会生成列表的平均值。例如:
reduce
placeholder
reduce((acc, x) => (acc + x) / 2)
rdd.reduce((acc, x) => (acc + x) / 2)
integer division
List[Double](1, 2, 3).reduce((a, x) => (a + x) / 2) --> (1.0 + 2.0) / 2 = 1.5 --> (1.5 + 3.0) / 2 = 2.25 Result: 2.25
鉴于:
Average of List[Double](1, 2, 3) = 2.0
我应该如何编写一个[Reduce]函数来找出平均值,使其符合此要求?我不确定reduce是否适合直接计算列表的平均值。您当然可以使用reduce(_ + _)对列表求和,然后将总和除以其大小,如下所示:
reduce(_ + _)
rdd.reduce(_ + _) / rdd.count.toDouble
但是,您可以简单地使用RDD的内置函数mean:
mean
rdd.mean
mf98qq942#
您还可以使用PairRDD来跟踪所有元素的总和以及元素的计数。
PairRDD
val pair = sc.parallelize(1 to 100) .map(x => (x, 1)) .reduce((x, y) => (x._1 + y._1, x._2 + y._2)) val mean = pair._1 / pair._2
tf7tbtn23#
检查一下这个Val lt=sc.parallize((list(2,4,5,7,2)Lt.sum/lt.count
3条答案
按热度按时间bihw5rsg1#
Rdd.create((+)/2)
上述用于计算平均值的
reduce
方法有几个问题:1.
placeholder
语法不能作为reduce((acc, x) => (acc + x) / 2)
的缩写1.由于您的RDD类型为整型,
rdd.reduce((acc, x) => (acc + x) / 2)
将在每次迭代中生成integer division
(计算平均值肯定是错误的)1.
reduce
方法不会生成列表的平均值。例如:鉴于:
我应该如何编写一个[Reduce]函数来找出平均值,使其符合此要求?
我不确定
reduce
是否适合直接计算列表的平均值。您当然可以使用reduce(_ + _)
对列表求和,然后将总和除以其大小,如下所示:但是,您可以简单地使用RDD的内置函数
mean
:mf98qq942#
您还可以使用
PairRDD
来跟踪所有元素的总和以及元素的计数。tf7tbtn23#
检查一下这个
Val lt=sc.parallize((list(2,4,5,7,2)
Lt.sum/lt.count