spark流复制网络调用

ozxc1zmp  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(322)

我用Pypark和Kafka接收器来处理推特流。我的应用程序的一个步骤包括调用google自然语言api来获得每条tweet的情绪得分。然而,我看到api在每个处理过的tweet上都会收到几个调用(我在google云控制台中看到了调用的数量)。
另外,如果我打印tweetids(在Map函数中),我会得到3到4次相同的id。在我的应用程序结束时,tweets被发送到kafka的另一个主题,在那里我得到了正确的tweets计数(没有重复的id),所以原则上一切都正常工作,但我不知道如何避免每次tweet调用google api不止一次。
这与spark或kafka中的某些配置参数有关吗?
下面是我的控制台输出示例:

  1. TIME 21:53:36: Google Response for tweet 801181843500466177 DONE!
  2. TIME 21:53:36: Google Response for tweet 801181854766399489 DONE!
  3. TIME 21:53:36: Google Response for tweet 801181844808966144 DONE!
  4. TIME 21:53:37: Google Response for tweet 801181854372012032 DONE!
  5. TIME 21:53:37: Google Response for tweet 801181843500466177 DONE!
  6. TIME 21:53:37: Google Response for tweet 801181854766399489 DONE!
  7. TIME 21:53:37: Google Response for tweet 801181844808966144 DONE!
  8. TIME 21:53:37: Google Response for tweet 801181854372012032 DONE!

但是在Kafka接收器中,我只收到4条经过处理的tweet(这是正确的接收方式,因为它们只有4条独特的tweet)。
执行此操作的代码是:

  1. def sendToKafka(rdd,topic,address):
  2. publish_producer = KafkaProducer(bootstrap_servers=address,\
  3. value_serializer=lambda v: json.dumps(v).encode('utf-8'))
  4. records = rdd.collect()
  5. msg_dict = defaultdict(list)
  6. for rec in records:
  7. msg_dict["results"].append(rec)
  8. publish_producer.send(resultTopic,msg_dict)
  9. publish_producer.close()
  10. kafka_stream = KafkaUtils.createStream(ssc, zookeeperAddress, "spark-consumer-"+myTopic, {myTopic: 1})
  11. dstream_tweets=kafka_stream.map(lambda kafka_rec: get_json(kafka_rec[1]))\
  12. .map(lambda post: add_normalized_text(post))\
  13. .map(lambda post: tagKeywords(post,tokenizer,desired_keywords))\
  14. .filter(lambda post: post["keywords"] == True)\
  15. .map(lambda post: googleNLP.complementTweetFeatures(post,job_id))
  16. dstream_tweets.foreachRDD(lambda rdd: sendToKafka(rdd,resultTopic,PRODUCER_ADDRESS))
z3yyvxxp

z3yyvxxp1#

我已经找到解决办法了!我只需要缓存数据流:

  1. dstream_tweets.cache()

之所以发生多个网络调用,是因为spark在脚本中执行后面的操作之前重新计算了该数据流中的RDD。当我缓存()数据流时,只需要计算一次;由于它保存在内存中,以后的函数可以访问这些信息而无需重新计算(在这种情况下,重新计算需要再次调用api,因此值得付出更多内存使用的代价)。

相关问题