我试图从Databricks上的Kafka主题中读取一些alpha Vantage财务数据作为spark dataframe,当使用消费者方法阅读消息时,我发现数据看起来像:
Received message: {"From_Currency Code": "USD", "From_Currency Name": "United States Dollar", "To_Currency Code": "JPY", "To_Currency Name": "Japanese Yen", "Exchange Rate": "141.63600000", "Last Refreshed": "2023-07-21 15:46:04", "Time Zone": "UTC", "Bid Price": "141.62900000", "Ask Price": "141.63940000"}
字符串
但是当我尝试编写Spark流时,我得到“查询没有结果”的消息
这就是我所尝试的:
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", 'localhost:9092') \
.option("subscribe", topic) \
.load()
query = df.writeStream \
.outputMode("append") \
.format("console") \
.queryName("fx") \
.start()
型
我希望在单元格结果中显示数据框行。
1条答案
按热度按时间gdx19jrr1#
这里可能有几个问题:
localhost
是无用的-您需要提供实际的主机名和其他信息.option("startingOffsets", "earliest")
添加到spark.readStream
df.writeStream.format("console")
将数据输出到日志文件,因此它不可见。使用display(df)
查看笔记本中的数据。