无法在spark中查看来自kafka流的消息

e1xvtsh3  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(483)

我刚开始测试 Kafka StreamSpark 使用 Pyspark 图书馆。
我一直在运行整个安装程序 Jupyter Notebook . 我正试图从 Twitter Streaming .
twitter流代码:

  1. import json
  2. import tweepy
  3. from uuid import uuid4
  4. import time
  5. from kafka import KafkaConsumer
  6. from kafka import KafkaProducer
  7. auth = tweepy.OAuthHandler("key", "key")
  8. auth.set_access_token("token", "token")
  9. api = tweepy.API(auth, wait_on_rate_limit=True, retry_count=3, retry_delay=5,
  10. retry_errors=set([401, 404, 500, 503]))
  11. class CustomStreamListener(tweepy.StreamListener):
  12. def __init__(self, api):
  13. self.api = api
  14. super(tweepy.StreamListener, self).__init__()
  15. def on_data(self, tweet):
  16. print tweet
  17. # Kafka Producer to send data to twitter topic
  18. producer.send('twitter', json.dumps(tweet))
  19. def on_error(self, status_code):
  20. print status_code
  21. return True # Don't kill the stream
  22. def on_timeout(self):
  23. print 'on_timeout'
  24. return True # Don't kill the stream
  25. producer = KafkaProducer(bootstrap_servers='localhost:9092')
  26. sapi = tweepy.streaming.Stream(auth, CustomStreamListener(api))
  27. sapi.filter(track=["#party"])

Spark流代码

  1. from pyspark import SparkContext
  2. from pyspark.streaming import StreamingContext
  3. from pyspark.streaming.kafka import KafkaUtils
  4. sc = SparkContext(appName="PythonSparkStreamingKafka_RM_01").getOrCreate()
  5. sc.setLogLevel("WARN")
  6. streaming_context = StreamingContext(sc, 10)
  7. kafkaStream = KafkaUtils.createStream(streaming_context, 'localhost:2181', 'spark-streaming', {'twitter': 1})
  8. parsed = kafkaStream.map(lambda v: v)
  9. parsed.count().map(lambda x:'Tweets in this batch: %s' % x).pprint()
  10. streaming_context.start()
  11. streaming_context.awaitTermination()

打印输出:

时间:2017-09-30 11:21:00

时间:2017-09-30 11:21:10

时间:2017-09-30 11:21:20

我做错了什么?

bnl4lu3b

bnl4lu3b1#

您还可以使用一些gui工具,比如kafdrop。它在调试Kafka消息时非常有用。您不仅可以查看消息队列,还可以查看分区及其偏移量等。
这是一个很好的工具,您应该能够轻松地部署它。
以下是链接:https://github.com/homeadvisor/kafdrop

bjp0bcyl

bjp0bcyl2#

您可以使用以下两个步骤调试应用程序。
1) 使用示例使用者(如kafkawordcount)测试是否有数据(kafka主题是否有消息)
kafka附带了一个命令行客户机,它将从文件或标准输入中获取输入,并将其作为消息发送到kafka集群。默认情况下,每行将作为单独的消息发送。
运行producer,然后在控制台中键入一些消息以发送到服务器。

  1. kafka-console-producer.sh \
  2. --broker-list <brokeer list> \
  3. --topic <topic name> \
  4. --property parse.key=true \
  5. --property key.separator=, \
  6. --new-producer

例子:

  1. > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

如果你看到打印消息,那么你有Kafka的消息,如果没有,那么你的生产者是不工作的
2) 打开日志记录

  1. Logger.getLogger("org").setLevel(Level.WARNING);
  2. Logger.getLogger("akka").setLevel(Level.WARNING);
  3. Logger.getLogger("kafka").setLevel(Level.WARNING);
展开查看全部

相关问题