不要在spark(python)中写无行或空行

lh80um4z  于 2021-06-02  发布在  Hadoop
关注(0)|答案(2)|浏览(274)

我是spark的新手,但我在hadoop方面有一些经验。我正在尝试修改hadoop流媒体中使用的python代码,它可以过滤掉json格式的tweet。
通常,我的函数有一个条件,如果条件为真,则打印到stdout tweet,否则不打印任何内容。

def filter(tweet):
   if criteria(tweet) is True:
      print json.dumps(tweet)

这样,最终的输出文件将只包含我想要的tweets。
然而,当我尝试使用spark时,我不得不改变 print 带有 return 如果情况属实,我会回复tweet None 否则。

def filter(tweet):
       if criteria(tweet) is True:
          return json.dumps(tweet)

尝试将结果保存到磁盘时出现问题。使用 saveAsTextFile pyspark的方法,它不仅保存了我想要的tweet,还保存了 None 当条件为假时返回。
我怎样才能避免写作 None 所以我只保存想要的tweet?
非常感谢。
乔治

rsaldnfx

rsaldnfx1#

如果在Map中使用函数,它不会减少元素的数量。要过滤元素,必须使用 filter 测试元素是否 None 在你之后 map .

w7t8yxp5

w7t8yxp52#

非常优雅的解决方案,避免了链接 filter 以及 map ,是用来 flatMap :

def filter(tweet):
    return [json.dumps(tweet)] if criteria(tweet) is True else []

some_rdd.flatMap(filter)

相关问题