wrapping structfield在来自\u avro()的

ejk8hzay  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(269)

我正在运行一个测试,创建一个Dataframe,用 to_avro() 然后用 from_avro() .
初始(未编码)Dataframe的模式具有 StructField 我在avro中编码的字段的示例。
我的目标是使用从中提取的avro模式 StructField 作为 from_avro() .
但是,使用 from_avro() 只有我包了一个 StructField 进入一个额外的嵌套层:

val keyAvroSchema = SchemaConverters.toAvroType(
    StructType( // ???
        Array( // ???
            keySparkSchema
        )
    )
).toString()

... 然后需要展开已解码的列以深入了解值:

val decoded = encoded
    .select(
      from_avro($"key", keyAvroSchema).as("wrapped_key")
    )
    .selectExpr("wrapped_key.*") // <== want to avoid this step

我的问题是:有没有不 Package 模式的方法?
jupyter中的完整代码:

import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.avro.to_avro
import org.apache.spark.sql.avro.from_avro

import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
val jsons = Seq(
  """{"key": {"id": "8166"}, "value": {"timestamp": 1596789012}}""",
  """{"key": {"id": "8167"}, "value": {"timestamp": 1596789123}}"""
)
val rdd = spark.sparkContext.parallelize(jsons)

val df = spark.read.json(rdd)
jsons = List({"key": {"id": "8166"}, "value": {"timestamp": 1596789012}}, {"key": {"id": "8167"}, "value": {"timestamp": 1596789123}})
rdd = ParallelCollectionRDD[0] at parallelize at <console>:35
df = [key: struct<id: string>, value: struct<timestamp: bigint>]

[key: struct<id: string>, value: struct<timestamp: bigint>]
val encoded = df
  .select(
    to_avro($"key").as("key")
  )
encoded = [key: binary]

[key: binary]
val keySparkSchema: StructField = df.schema("key")
keySparkSchema = StructField(key,StructType(StructField(id,StringType,true)),true)

StructField(key,StructType(StructField(id,StringType,true)),true)
val keyAvroSchema = SchemaConverters.toAvroType(
    StructType( // ???
        Array( // ???
            keySparkSchema
        )
    )
).toString()
keyAvroSchema = {"type":"record","name":"topLevelRecord","fields":[{"name":"key","type":[{"type":"record","name":"key","namespace":"topLevelRecord","fields":[{"name":"id","type":["string","null"]}]},"null"]}]}

{"type":"record","name":"topLevelRecord","fields":[{"name":"key","type":[{"type":"record","name":"key","namespace":"topLevelRecord","fields":[{"name":"id","type":["string","null"]}]},"null"]}]}
val decoded = encoded
    .select(
      from_avro($"key", keyAvroSchema).as("wrapped_key")
    )
    .selectExpr("wrapped_key.*") // <== want to avoid this step
decoded = [key: struct<id: string>]

[key: struct<id: string>]
decoded.printSchema
root
 |-- key: struct (nullable = true)
 |    |-- id: string (nullable = true)
decoded.show()
+------+
|   key|
+------+
|[8166]|
|[8167]|
+------+
decoded.selectExpr("key.*").show
+----+
|  id|
+----+
|8166|
|8167|
+----+

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题