我收到一个Kafka主题在下面的格式。
{
"football":{
"gameID":"abcd",
"duration":"90mins",
"players":"11"
}
"cricket":[{
"gameID":"abcd",
"duration":"100mins",
"players":"11"
"type":"Multi"
}],
"rugby":[{
"gameID":"abcd"
"duration":"110mins",
"players":"11",
"countries":"10"
}]
}
我正在尝试将传入的主题拆分为多个Dataframe。例如,足球进入一个Dataframe,板球进入另一个Dataframe,橄榄球进入另一个Dataframe。我在structype中定义了一个嵌套模式来读取传入的json数据,如下所示。
val game_schema = StructType(Array(
StructField("football", StructType(Array(
StructField("gameID", StringType, true),
StructField("duration", StringType, true),
StructField("players", StringType, true)))),
StructField("cricket", StructType(Array(
StructField("gameID", StringType, true),
StructField("duration", StringType, true),
StructField("type", StringType, true)))),
StructField("rugby", StructType(Array(
StructField("gameID", StringType, true),
StructField("duration", StringType, true),
StructField("countries", StringType, true))))
))
这就是我试图阅读kafka主题的方式,它是一个嵌套的json,并将足球数据取出到一个单独的数据框中。
val gamedata = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "serveraddress:9092")
.option("subscribe", "kafkatopic")
.option("kafka.security.protocol", "SASL_PLAINTEXT")
.load()
.select(from_json(col("value"), game_schema).as("parsed_data"))
.selectExpr("parsed_data.football.gameID as gameID, parsed_data.football.duration as duration, parsed_data.football.players as players")
但是,当我试图在控制台上显示已解析Dataframe的值时,我看到一条错误消息,如下所示:
org.apache.spark.sql.AnalysisException: cannot resolve 'jsontostructs(`value`)' due to data type mismatch: argument 1 requires string type, however, '`value`' is of binary type.;;
我不明白我在这里犯的错误。我在用柱子 value
其中包含来自主题的数据并对其应用架构。这是解析上述格式的嵌套json的正确方法吗?
有人能帮我改正错误吗?
暂无答案!
目前还没有任何答案,快来回答吧!