我想从来自kafka的json数据推断出一个安全的模式。
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "input").option("auto.offset.reset", "latest").load()
jsonDF = df.selectExpr("CAST(value AS STRING) jsonData")
stackoverflow有几种解决方案:
link说,要将一个小批量保存到一个文件中,请推断模式,然后将该模式用于流式Dataframe。尽管这个问题已经有一年了。
链接使用 schema_of_json
以及 lit
,尽管我无法让它与流式dfs一起工作。
我知道推断模式可能很危险,但是我正在开发的spark应用程序有多个具有不同模式的源(多列)。有没有一种方法可以基于json数据中的列创建一个模式并强制转换它们 String
以防止数据丢失。
暂无答案!
目前还没有任何答案,快来回答吧!