用 flask ,我试着:
获取用户输入
根据输入过滤数据流
处理过滤数据
将结果重定向到kibana Jmeter 板以进行可视化
在 streamer.py
,我有办法 stream_tweets
根据输入流推特。
当我跑的时候 streamer.py
使用python,我可以在终端上获取tweet,并在kibana Jmeter 板上查看可视化效果。
class Streamer():
def __init__(self):
self.twitter_authenticator = TwitterAuthenticator()
def stream_tweets(self, hash_tag_list):
listener = TwitterListener()
auth = self.twitter_authenticator.authenticate_twitter_app()
stream = Stream(auth, listener)
stream.filter(track=hash_tag_list, is_async=True)
def main():
tweet_streamer = Streamer()
hash_tag = ['Arsenal']
tweet_streamer.stream_tweets(hash_tag)
if __name__ == '__main__':
main()
此文件调用 listener.py
,它使用textblob库处理数据。
import json
from tweepy.streaming import StreamListener
from textblob import TextBlob
from elasticsearch import Elasticsearch
class TwitterListener(StreamListener):
def __init__(self):
self.es = Elasticsearch()
self.es.indices.create(index='twitter', ignore=400)
def on_data(self, data):
try:
dict_data = json.loads(data)
tweet = TextBlob(dict_data["text"])
print(tweet.sentiment.polarity)
if (tweet.sentiment.polarity == 0):
sentiment = "neutral"
elif (tweet.sentiment.polarity > 0 and tweet.sentiment.polarity <= 0.3):
sentiment = "weak positive"
elif (tweet.sentiment.polarity > 0.3 and tweet.sentiment.polarity <= 0.6):
sentiment = "positive"
elif (tweet.sentiment.polarity > 0.6 and tweet.sentiment.polarity <= 1):
sentiment = "strong positive"
elif (tweet.sentiment.polarity > -0.3 and tweet.sentiment.polarity <= 0):
sentiment = "weak negative"
elif (tweet.sentiment.polarity > -0.6 and tweet.sentiment.polarity <= -0.3):
sentiment = "negative"
else: # (tweet.sentiment.polarity > -1 and tweet.sentiment.polarity <= -0.6):
sentiment = "strong negative"
print(sentiment)
self.es.index(
index = "twitter",
doc_type = "test-type",
body = {
"author": dict_data["user"]["screen_name"],
"date": dict_data["created_at"],
"message": dict_data["text"],
"polarity": tweet.sentiment.polarity,
"subjectivity": tweet.sentiment.subjectivity,
"sentiment": sentiment
})
return True
except Exception as e:
print("Data Error:", str(e))
return False
def on_error(self, status):
"""Repeated access after rate limit will lock developer account."""
if status == 420:
return False
print(status)
但是当我从flask中调用相同的方法时,结果模板被呈现,但是tweet流化的时间非常短——之后我得到数据错误:“text”
下面是 flask 微服务:
@app.route("/dynamic-analysis/", methods=["GET", "POST"])
def dynamic_analysis():
tweet_streamer = Streamer()
if request.method == "POST":
hash_tag = request.form['dynamic_hashtag']
tweet_streamer.stream_tweets(hash_tag)
return render_template("query-dynamic-analysis.html") # contains URL to Kibana dashboard
else: # request.method == "GET":
return render_template("dynamic-analysis.html") # form to get user input
我试过打电话 render_template
有一个额外的数据属性 tweet_streamer.stream_tweets(hash_tag)
,但我仍然得到相同的错误。
我怎样才能解决这个问题?
更新:
我也试过使用 gunicorn
以及 foreman
具有以下配置: gunicorn.conf.py
:
workers = 4
worker_class = 'sync'
loglevel = 'debug'
accesslog = '-'
errorlog = '-'
``` `Procfile` :
flask_dynamic_analysis: gunicorn --bind 127.0.0.1:$PORT --config gunicorn.conf.py twitter_sentiment_analysis:app
microservices_routes: gunicorn --bind 127.0.0.1:$PORT --config gunicorn.conf.py app.routes:app
``` .foreman
:
formation: flask_dynamic_analysis=2, microservices_routes=2
port: 3000
但仍然得到同样的错误。
暂无答案!
目前还没有任何答案,快来回答吧!