如何将kafka主题的嵌套json数据拆分为多个Dataframe?

w8f9ii69  于 2021-07-14  发布在  Spark
关注(0)|答案(0)|浏览(242)

我收到一个Kafka主题在下面的格式。

{
"football":{
    "gameID":"abcd",
    "duration":"90mins",
    "players":"11"
}
"cricket":[{
    "gameID":"abcd",
    "duration":"100mins",
    "players":"11"
    "type":"Multi"
}],
"rugby":[{
    "gameID":"abcd"
    "duration":"110mins",
    "players":"11",
    "countries":"10"
}]
}

我正在尝试将传入的主题拆分为多个Dataframe。例如,足球进入一个Dataframe,板球进入另一个Dataframe,橄榄球进入另一个Dataframe。我在structype中定义了一个嵌套模式来读取传入的json数据,如下所示。

val game_schema = StructType(Array(
    StructField("football", StructType(Array(
    StructField("gameID", StringType, true),
    StructField("duration", StringType, true),
    StructField("players", StringType, true)))),

    StructField("cricket", StructType(Array(
    StructField("gameID", StringType, true),
    StructField("duration", StringType, true),
    StructField("type", StringType, true)))),

    StructField("rugby", StructType(Array(
    StructField("gameID", StringType, true),
    StructField("duration", StringType, true),
    StructField("countries", StringType, true))))
))

这就是我试图阅读kafka主题的方式,它是一个嵌套的json,并将足球数据取出到一个单独的数据框中。

val gamedata = spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", "serveraddress:9092")
      .option("subscribe", "kafkatopic")
      .option("kafka.security.protocol", "SASL_PLAINTEXT")
      .load()
      .select(from_json(col("value"), game_schema).as("parsed_data"))
      .selectExpr("parsed_data.football.gameID as gameID, parsed_data.football.duration as duration, parsed_data.football.players as players")

但是,当我试图在控制台上显示已解析Dataframe的值时,我看到一条错误消息,如下所示:

org.apache.spark.sql.AnalysisException: cannot resolve 'jsontostructs(`value`)' due to data type mismatch: argument 1 requires string type, however, '`value`' is of binary type.;;

我不明白我在这里犯的错误。我在用柱子 value 其中包含来自主题的数据并对其应用架构。这是解析上述格式的嵌套json的正确方法吗?
有人能帮我改正错误吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题