我是个Spark新人
我正在尝试使用Spark Stream阅读Kafka主题。
从Kafka流出的数据的“value”字段是一个json字符串。我想将这个“value”字段转换为一个字符串,并将其更改为一个parquet文件。
我想从value字段中包含的字符串值中获取模式信息。原因是,JSON数据字段继续添加
比如像这样的Kafka数据。
| 关键|值|......这是什么?|
| --|--|--|
| 0 |“{a:1,B:2,c:3}.”|......这是什么?|
| 1 |“{a:1,B:2,c:3,d:4}..”|......这是什么?|
我正在尝试这个代码
source_df = streaming_data.selectExpr("CAST(value AS STRING)").alias("value") \
.select(from_json("value", schema_of_json(streaming_data.select('value').first().getString(0)))).alias("data") \
.select("data.*")
字符串
我得到了错误pyspark.sql.utils.AnalysisException:与流源的连接必须用writeStream.start()执行;
请帮
1条答案
按热度按时间oiopk7p51#
选项1:硬编码模式并在
F.from_json()
中使用它。字符串
备选方案2:如果你想动态地推断模式,你可以使用
foreachbatch
。但是要注意,这是有风险的,破坏模式的改变会使流查询失败。而且不能保证模式会被正确地推断。型