阅读Spark Streaming from Kafka主题

mspsb9vt  于 2023-08-02  发布在  Apache
关注(0)|答案(1)|浏览(90)

我试图从Databricks上的Kafka主题中读取一些alpha Vantage财务数据作为spark dataframe,当使用消费者方法阅读消息时,我发现数据看起来像:

Received message: {"From_Currency Code": "USD", "From_Currency Name": "United States Dollar", "To_Currency Code": "JPY", "To_Currency Name": "Japanese Yen", "Exchange Rate": "141.63600000", "Last Refreshed": "2023-07-21 15:46:04", "Time Zone": "UTC", "Bid Price": "141.62900000", "Ask Price": "141.63940000"}

字符串
但是当我尝试编写Spark流时,我得到“查询没有结果”的消息
这就是我所尝试的:

df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", 'localhost:9092') \
    .option("subscribe", topic) \
    .load()

query = df.writeStream \
    .outputMode("append") \
    .format("console") \
    .queryName("fx") \
    .start()


我希望在单元格结果中显示数据框行。

gdx19jrr

gdx19jrr1#

这里可能有几个问题:

  • 很可能您没有在群集的每个节点上运行Kafka,因此localhost是无用的-您需要提供实际的主机名和其他信息
  • 如果要查看历史数据,则需要将.option("startingOffsets", "earliest")添加到spark.readStream
  • df.writeStream.format("console")将数据输出到日志文件,因此它不可见。使用display(df)查看笔记本中的数据。

相关问题