对于我的spark试用版,我下载了ny taxi csv文件,并将它们合并到一个文件nytaxi.csv中。然后我将其保存在hadoop fs中。我使用7个节点管理器的Spark线。
我正在连接spark over ipython笔记本。
下面是一个示例python脚本,用于计算nytaxi.csv中的行数。
nytaxi=sc.textFile("hdfs://bigdata6:8020/user/baris/nytaxi/nytaxi.csv")
filtered=nytaxi.filter(lambda x:"distance" not in x)
splits = filtered.map(lambda x: float(x.split(",")[9]))
splits.cache()
splits.count()
返回73491693。但是,当我尝试用下面的代码计算行数时,它返回一个大约803000的值。
def plusOne (sum, v):
#print sum, v
return sum + 1;
splits.reduce(plusOne)
我想知道为什么结果不同。谢谢
来自csv的样本行:u'740bd5be61840be4fe3905cc3ebe3e7e,e48b185060fb0ff49be6da43e69e624b,cmt,1,n,2013-10-01 12:44:292013-10-01 12:53:26,1536,1.20,-73.974319,40.741859,-73.99115,40.742424'
3条答案
按热度按时间sh7euo9m1#
这个问题正如丹尼尔所说的那样
reduce
必须是结合的和交换的。以下是来源本身的原因:请注意
reduce
在每个分区上完成的是对其迭代器的简单委托reduceLeft
. 这不会造成任何问题,因为这只是价值的积累。但是,分区的合并是个问题。在您的示例中,它是如何分解的(假设4个均匀分割的分区上有40个计数):
所以,你应该更喜欢
count
,或者照丹尼尔的建议去做map
,或者你有第三个选择aggregate
```rdd.aggregate(0)(_+1, +)
inn6fuwd2#
的文档
RDD.reduce()
说:使用指定的交换和结合二元运算符减少此rdd的元素。
def plusOne(sum, v): return sum + 1
不是可交换的。它完全忽略其中一个参数。所以你看到的是未定义的行为(我建议考虑为什么函数必须是可交换的。如果你明白这一点,你就更了解spark了!)解决方法是使用
RDD.count()
相反。但如果你坚持使用reduce()
,以下是您的操作方法:pb3s4cty3#
这不是完整的答案
因为我不能把我的发现发表评论,所以我把它们写在这里。
我可以用一个更简单的例子来重现你的问题。
输出
为了
xrangeRDD = sc.parallelize(data, 4)
输出为了
xrangeRDD = sc.parallelize(data, 1)
输出因为我只改变了分区的数量,这也改变了reduce的输出,所以我认为reduce只提供了一个分区的输出,正如这里的模式所建议的那样。
我还在学习Spark的工作原理。所以我不能完全理解为什么会这样。我希望通过这个额外的细节,有人能够解释这背后的原因。