使用scala展平avro文件

xuo3flqw  于 2021-07-13  发布在  Spark
关注(0)|答案(0)|浏览(303)

我从事件中心得到了一些avro文件。每个avro文件都有不同的模式。
我需要平坦的avro文件到一个结构化的,这应该是动态发生。
avro文件中包含二进制类型、Map类型和结构类型的数据。
有没有办法把avro文件炸平。
架构:

  1. root
  2. |-- SequenceNumber: long (nullable = true)
  3. |-- Offset: string (nullable = true)
  4. |-- EnqueuedTimeUtc: string (nullable = true)
  5. |-- SystemProperties: map (nullable = true)
  6. | |-- key: string
  7. | |-- value: struct (valueContainsNull = true)
  8. | | |-- member0: long (nullable = true)
  9. | | |-- member1: double (nullable = true)
  10. | | |-- member2: string (nullable = true)
  11. | | |-- member3: binary (nullable = true)
  12. |-- Properties: map (nullable = true)
  13. | |-- key: string
  14. | |-- value: struct (valueContainsNull = true)
  15. | | |-- member0: long (nullable = true)
  16. | | |-- member1: double (nullable = true)
  17. | | |-- member2: string (nullable = true)
  18. | | |-- member3: binary (nullable = true)
  19. |-- Body: binary (nullable = true)

我需要它就像,

  1. SequenceNumber, offset, EnqueuedTimeUtc, systemproperties_member0,systemproperties_member1,systemproperties_member2,systemproperties_member3,Properties_member0,Properties_member1,Properties_member2,Properties_member3,Body_Application.Body_Level ....

二进制类型的主体得到了下面的模式,我在代码中看到了这个模式,

  1. val readavro = spark.read.format("avro").load("<path of avro file>")
  2. val convertbinary = readavro.select(col("Body").cast("String")).rdd.map(x=>x(0).toString())
  3. val readjson = spark.read.json(convertbinary).
  4. |-- Application: string (nullable = true)
  5. |-- Level: string (nullable = true)
  6. |-- Message1: string (nullable = true)
  7. |-- Properties: struct (nullable = true)
  8. | |-- AuditTimestamp: string (nullable = true)
  9. | |-- Component: string (nullable = true)
  10. | |-- SubscriptionName: string (nullable = true)
  11. | |-- TopicName: string (nullable = true)
  12. |-- Request: string (nullable = true)
  13. |-- CorrelationId: string (nullable = true)
  14. |-- Method: string (nullable = true)
  15. |-- tUrl: string (nullable = true)
  16. |-- Code: string (nullable = true)
  17. |-- Response: string (nullable = true)
  18. |-- CorrelationId: string (nullable = true)
  19. |-- Session: string (nullable = true)
  20. |-- Timestamp: string (nullable = true)

我需要将整个avro记录动态转换为dataframe

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题