我是spark的新手,但我在hadoop方面有一些经验。我正在尝试修改hadoop流媒体中使用的python代码,它可以过滤掉json格式的tweet。
通常,我的函数有一个条件,如果条件为真,则打印到stdout tweet,否则不打印任何内容。
def filter(tweet):
if criteria(tweet) is True:
print json.dumps(tweet)
这样,最终的输出文件将只包含我想要的tweets。
然而,当我尝试使用spark时,我不得不改变 print
带有 return
如果情况属实,我会回复tweet None
否则。
def filter(tweet):
if criteria(tweet) is True:
return json.dumps(tweet)
尝试将结果保存到磁盘时出现问题。使用 saveAsTextFile
pyspark的方法,它不仅保存了我想要的tweet,还保存了 None
当条件为假时返回。
我怎样才能避免写作 None
所以我只保存想要的tweet?
非常感谢。
乔治
2条答案
按热度按时间rsaldnfx1#
如果在Map中使用函数,它不会减少元素的数量。要过滤元素,必须使用
filter
测试元素是否None
在你之后map
.w7t8yxp52#
非常优雅的解决方案,避免了链接
filter
以及map
,是用来flatMap
: