使用scala展平avro文件

xuo3flqw  于 2021-07-13  发布在  Spark
关注(0)|答案(0)|浏览(265)

我从事件中心得到了一些avro文件。每个avro文件都有不同的模式。
我需要平坦的avro文件到一个结构化的,这应该是动态发生。
avro文件中包含二进制类型、Map类型和结构类型的数据。
有没有办法把avro文件炸平。
架构:

root
 |-- SequenceNumber: long (nullable = true)
 |-- Offset: string (nullable = true)
 |-- EnqueuedTimeUtc: string (nullable = true)
 |-- SystemProperties: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)
 |    |    |-- member0: long (nullable = true)
 |    |    |-- member1: double (nullable = true)
 |    |    |-- member2: string (nullable = true)
 |    |    |-- member3: binary (nullable = true)
 |-- Properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)
 |    |    |-- member0: long (nullable = true)
 |    |    |-- member1: double (nullable = true)
 |    |    |-- member2: string (nullable = true)
 |    |    |-- member3: binary (nullable = true)
 |-- Body: binary (nullable = true)

我需要它就像,

SequenceNumber, offset, EnqueuedTimeUtc, systemproperties_member0,systemproperties_member1,systemproperties_member2,systemproperties_member3,Properties_member0,Properties_member1,Properties_member2,Properties_member3,Body_Application.Body_Level ....

二进制类型的主体得到了下面的模式,我在代码中看到了这个模式,

val readavro = spark.read.format("avro").load("<path of avro file>")
val convertbinary = readavro.select(col("Body").cast("String")).rdd.map(x=>x(0).toString())
val readjson = spark.read.json(convertbinary).

 |-- Application: string (nullable = true)
 |-- Level: string (nullable = true)
 |-- Message1: string (nullable = true)
 |-- Properties: struct (nullable = true)
 |    |-- AuditTimestamp: string (nullable = true)
 |    |-- Component: string (nullable = true)
 |    |-- SubscriptionName: string (nullable = true)
 |    |-- TopicName: string (nullable = true)
 |-- Request: string (nullable = true)
 |-- CorrelationId: string (nullable = true)
 |-- Method: string (nullable = true)
 |-- tUrl: string (nullable = true)
 |-- Code: string (nullable = true)
 |-- Response: string (nullable = true)
 |-- CorrelationId: string (nullable = true)
 |-- Session: string (nullable = true)
 |-- Timestamp: string (nullable = true)

我需要将整个avro记录动态转换为dataframe

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题