如何解析包含嵌套json格式数据的kafka主题?

fxnxkyjh  于 2021-07-14  发布在  Spark
关注(0)|答案(1)|浏览(678)

我正试着读一个Kafka的主题,然后把它流到我的Flume里。为了读取数据,我编写了以下代码。
json中的主题数据:

  1. {
  2. "HiveData": {
  3. "Tablename": "HiveTablename1",
  4. "Rowcount": "3213423",
  5. "lastupdateddate": "2021-02-24 13:04:14"
  6. },
  7. "HbaseData": [
  8. {
  9. "Tablename": "HbaseTablename1",
  10. "Rowcount": "23543",
  11. "lastupdateddate": "2021-02-23 12:03:11"
  12. }
  13. ],
  14. "PostgresData": [
  15. {
  16. "Tablename": "PostgresTablename1",
  17. "Rowcount": "23454345",
  18. "lastupdateddate": "2021-02-23 12:03:11"
  19. }
  20. ]
  21. }

下面是我为解析主题编写的代码:

  1. def streamData(): DataFrame = {
  2. val kafkaDF = spark.readStream.format("kafka")
  3. .option("kafka.bootstrap.servers", "server:port")
  4. .option("subscribe", "topic_name")
  5. .load()
  6. kafkaDF.select(from_json(col("HiveData"), topic_schema).as("HiveData")).selectExpr("HiveData.tablename as table", "HiveData.Rowcount as rowcount", "HiveData.lastupdateddate as lastupdateddate")
  7. kafkaDF
  8. }

但是如果json的格式是:

  1. {"Tablename": "HiveTablename1","Rowcount": "3213423","lastupdateddate": "2021-02-24 13:04:14"}

我想解析json并将hivedata转换成一个单独的dataframe和一个单独的dataframe(对于hba)以及postgresdata。如果json数据在一行中,我编写的代码就可以工作。有没有人能告诉我,如果数据是本问题开头提到的嵌套格式,如何将其解析为多个Dataframe?非常感谢您的帮助。

mrzz3bfm

mrzz3bfm1#

尝试添加
选项(“多行”、“真”)

相关问题