使用python和kafka的spark结构化流

nom7f22z  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(461)

我在尝试为kafka启动readstream时遇到以下错误,我的kafka已启动并运行,我对它进行了多次测试以确保它正在处理。Kafka主题也被创建。
'''

kafka_df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "mytopic") \
        .option("startingOffsets", "earliest") \
        .load()

'''
traceback(最近一次调用last):文件“c:/users//pycharmprojects/sparkstreaming/pysparkkafkastreaming.py”,第18行,kafka\u df=spark.readstream
文件“c:\users\appdata\local\programs\python38-32\lib\site packages\pyspark\sql\streaming.py”,第420行,在load return self.\u df(self.\u jreader.load())文件“c:\users\appdata\local\programs\python38-32\lib\site packages\py4j\java\u gateway.py”,第1304行,call return \u value=get \u return \u value(文件“c:\users\appdata\local\programs\python38-32\lib\site packages\pyspark\sql\utils.py”,第134行,deco raise \u from(converted)file“,第3行,raise \u from pyspark.sql.utils.analysisexception:找不到数据源:kafka。请按照“结构化流媒体+Kafka集成指南”的部署部分部署应用程序。;

kuhbmx9i

kuhbmx9i1#

你需要导入kafka依赖项来运行这个!对于pyspark,您可以下载jar并放入spark/jars目录,或者在sparksession inital config中导入依赖项。请遵循Kafka结构化流媒体文档
我希望我能帮上忙,任何你能问我的问题,谢谢!

相关问题