使用flask将tweet流式传输到控制台

lyr7nygr  于 2021-06-13  发布在  ElasticSearch
关注(0)|答案(0)|浏览(269)

用 flask ,我试着:
获取用户输入
根据输入过滤数据流
处理过滤数据
将结果重定向到kibana Jmeter 板以进行可视化
streamer.py ,我有办法 stream_tweets 根据输入流推特。
当我跑的时候 streamer.py 使用python,我可以在终端上获取tweet,并在kibana Jmeter 板上查看可视化效果。

  1. class Streamer():
  2. def __init__(self):
  3. self.twitter_authenticator = TwitterAuthenticator()
  4. def stream_tweets(self, hash_tag_list):
  5. listener = TwitterListener()
  6. auth = self.twitter_authenticator.authenticate_twitter_app()
  7. stream = Stream(auth, listener)
  8. stream.filter(track=hash_tag_list, is_async=True)
  9. def main():
  10. tweet_streamer = Streamer()
  11. hash_tag = ['Arsenal']
  12. tweet_streamer.stream_tweets(hash_tag)
  13. if __name__ == '__main__':
  14. main()

此文件调用 listener.py ,它使用textblob库处理数据。

  1. import json
  2. from tweepy.streaming import StreamListener
  3. from textblob import TextBlob
  4. from elasticsearch import Elasticsearch
  5. class TwitterListener(StreamListener):
  6. def __init__(self):
  7. self.es = Elasticsearch()
  8. self.es.indices.create(index='twitter', ignore=400)
  9. def on_data(self, data):
  10. try:
  11. dict_data = json.loads(data)
  12. tweet = TextBlob(dict_data["text"])
  13. print(tweet.sentiment.polarity)
  14. if (tweet.sentiment.polarity == 0):
  15. sentiment = "neutral"
  16. elif (tweet.sentiment.polarity > 0 and tweet.sentiment.polarity <= 0.3):
  17. sentiment = "weak positive"
  18. elif (tweet.sentiment.polarity > 0.3 and tweet.sentiment.polarity <= 0.6):
  19. sentiment = "positive"
  20. elif (tweet.sentiment.polarity > 0.6 and tweet.sentiment.polarity <= 1):
  21. sentiment = "strong positive"
  22. elif (tweet.sentiment.polarity > -0.3 and tweet.sentiment.polarity <= 0):
  23. sentiment = "weak negative"
  24. elif (tweet.sentiment.polarity > -0.6 and tweet.sentiment.polarity <= -0.3):
  25. sentiment = "negative"
  26. else: # (tweet.sentiment.polarity > -1 and tweet.sentiment.polarity <= -0.6):
  27. sentiment = "strong negative"
  28. print(sentiment)
  29. self.es.index(
  30. index = "twitter",
  31. doc_type = "test-type",
  32. body = {
  33. "author": dict_data["user"]["screen_name"],
  34. "date": dict_data["created_at"],
  35. "message": dict_data["text"],
  36. "polarity": tweet.sentiment.polarity,
  37. "subjectivity": tweet.sentiment.subjectivity,
  38. "sentiment": sentiment
  39. })
  40. return True
  41. except Exception as e:
  42. print("Data Error:", str(e))
  43. return False
  44. def on_error(self, status):
  45. """Repeated access after rate limit will lock developer account."""
  46. if status == 420:
  47. return False
  48. print(status)

但是当我从flask中调用相同的方法时,结果模板被呈现,但是tweet流化的时间非常短——之后我得到数据错误:“text”
下面是 flask 微服务:

  1. @app.route("/dynamic-analysis/", methods=["GET", "POST"])
  2. def dynamic_analysis():
  3. tweet_streamer = Streamer()
  4. if request.method == "POST":
  5. hash_tag = request.form['dynamic_hashtag']
  6. tweet_streamer.stream_tweets(hash_tag)
  7. return render_template("query-dynamic-analysis.html") # contains URL to Kibana dashboard
  8. else: # request.method == "GET":
  9. return render_template("dynamic-analysis.html") # form to get user input

我试过打电话 render_template 有一个额外的数据属性 tweet_streamer.stream_tweets(hash_tag) ,但我仍然得到相同的错误。
我怎样才能解决这个问题?
更新:
我也试过使用 gunicorn 以及 foreman 具有以下配置: gunicorn.conf.py :

  1. workers = 4
  2. worker_class = 'sync'
  3. loglevel = 'debug'
  4. accesslog = '-'
  5. errorlog = '-'
  6. ``` `Procfile` :

flask_dynamic_analysis: gunicorn --bind 127.0.0.1:$PORT --config gunicorn.conf.py twitter_sentiment_analysis:app
microservices_routes: gunicorn --bind 127.0.0.1:$PORT --config gunicorn.conf.py app.routes:app
``` .foreman :

  1. formation: flask_dynamic_analysis=2, microservices_routes=2
  2. port: 3000

但仍然得到同样的错误。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题