reduce和count的结果在pyspark中不同

vsikbqxv  于 2021-05-30  发布在  Hadoop
关注(0)|答案(3)|浏览(354)

对于我的spark试用版,我下载了ny taxi csv文件,并将它们合并到一个文件nytaxi.csv中。然后我将其保存在hadoop fs中。我使用7个节点管理器的Spark线。
我正在连接spark over ipython笔记本。
下面是一个示例python脚本,用于计算nytaxi.csv中的行数。

  1. nytaxi=sc.textFile("hdfs://bigdata6:8020/user/baris/nytaxi/nytaxi.csv")
  2. filtered=nytaxi.filter(lambda x:"distance" not in x)
  3. splits = filtered.map(lambda x: float(x.split(",")[9]))
  4. splits.cache()
  5. splits.count()

返回73491693。但是,当我尝试用下面的代码计算行数时,它返回一个大约803000的值。

  1. def plusOne (sum, v):
  2. #print sum, v
  3. return sum + 1;
  4. splits.reduce(plusOne)

我想知道为什么结果不同。谢谢
来自csv的样本行:u'740bd5be61840be4fe3905cc3ebe3e7e,e48b185060fb0ff49be6da43e69e624b,cmt,1,n,2013-10-01 12:44:292013-10-01 12:53:26,1536,1.20,-73.974319,40.741859,-73.99115,40.742424'

sh7euo9m

sh7euo9m1#

这个问题正如丹尼尔所说的那样 reduce 必须是结合的和交换的。以下是来源本身的原因:

  1. val reducePartition: Iterator[T] => Option[T] = iter => {
  2. if (iter.hasNext) {
  3. Some(iter.reduceLeft(cleanF))
  4. } else {
  5. None
  6. }
  7. }

请注意 reduce 在每个分区上完成的是对其迭代器的简单委托 reduceLeft . 这不会造成任何问题,因为这只是价值的积累。

  1. val mergeResult = (index: Int, taskResult: Option[T]) => {
  2. if (taskResult.isDefined) {
  3. jobResult = jobResult match {
  4. case Some(value) => Some(f(value, taskResult.get))
  5. case None => taskResult
  6. }
  7. }
  8. }

但是,分区的合并是个问题。在您的示例中,它是如何分解的(假设4个均匀分割的分区上有40个计数):

  1. A = 10; B = 10; C = 10; D = 10 //Local reductions. Everything ok
  2. A added in = 10 //Still ok
  3. B added in = f(10, 10) = 11 //Because your definition of f is (first + 1)
  4. //This drops the second param of 10
  5. C added in = f(11, 10) = 12 //Again, only adding 1 instead of the actual 10 count

所以,你应该更喜欢 count ,或者照丹尼尔的建议去做 map ,或者你有第三个选择 aggregate ```
rdd.aggregate(0)(_+1, +)

  1. 这将使用0作为计数的种子,在本地将1添加到累加器,然后在合并中将两个累加器添加到一起。
展开查看全部
inn6fuwd

inn6fuwd2#

的文档 RDD.reduce() 说:
使用指定的交换和结合二元运算符减少此rdd的元素。 def plusOne(sum, v): return sum + 1 不是可交换的。它完全忽略其中一个参数。所以你看到的是未定义的行为(我建议考虑为什么函数必须是可交换的。如果你明白这一点,你就更了解spark了!)
解决方法是使用 RDD.count() 相反。但如果你坚持使用 reduce() ,以下是您的操作方法:

  1. def count(rdd):
  2. return rdd.map(lambda x: 1).reduce(lambda a, b: a + b)
pb3s4cty

pb3s4cty3#

这不是完整的答案
因为我不能把我的发现发表评论,所以我把它们写在这里。
我可以用一个更简单的例子来重现你的问题。

  1. data = xrange(1, 10000)
  2. len(data) #output => 9999
  3. xrangeRDD = sc.parallelize(data, 8)
  4. print xrangeRDD.count()
  5. def plusOne (v,sum):
  6. #print sum, v
  7. return v + 1;
  8. a = xrangeRDD.reduce(plusOne)
  9. print a

输出

  1. 9999
  2. 1256

为了 xrangeRDD = sc.parallelize(data, 4) 输出

  1. 9999
  2. 2502

为了 xrangeRDD = sc.parallelize(data, 1) 输出

  1. 9999
  2. 9999

因为我只改变了分区的数量,这也改变了reduce的输出,所以我认为reduce只提供了一个分区的输出,正如这里的模式所建议的那样。
我还在学习Spark的工作原理。所以我不能完全理解为什么会这样。我希望通过这个额外的细节,有人能够解释这背后的原因。

展开查看全部

相关问题