你好,我正试图运行这个代码提取与Kafka实时推特,但是显示错误,在Kafka生产者收到当我运行它。我想不出问题出在哪里。
import json
from kafka import KafkaProducer
import tweepy
import configparser
class TweeterStreamListener(tweepy.StreamListener):
def __init__(self, api):
self.api = api
super(tweepy.StreamListener, self).__init__()
self.producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
def on_status(self, status):
# This method is called whenever new data arrives from live stream.
# We asynchronously push this data to kafka queue
msg = status.text.encode('utf-8')
try:
self.producer.send_messages(b'twitterstream', msg)
except Exception as e:
print(e)
return False
return True
def on_error(self, status_code):
print("Error received in kafka producer")
return True # Don't kill the stream
def on_timeout(self):
return True # Don't kill the stream
谢谢您。
暂无答案!
目前还没有任何答案,快来回答吧!