我正在运行一个测试,创建一个Dataframe,用 to_avro()
然后用 from_avro()
.
初始(未编码)Dataframe的模式具有 StructField
我在avro中编码的字段的示例。
我的目标是使用从中提取的avro模式 StructField
作为 from_avro()
.
但是,使用 from_avro()
只有我包了一个 StructField
进入一个额外的嵌套层:
val keyAvroSchema = SchemaConverters.toAvroType(
StructType( // ???
Array( // ???
keySparkSchema
)
)
).toString()
... 然后需要展开已解码的列以深入了解值:
val decoded = encoded
.select(
from_avro($"key", keyAvroSchema).as("wrapped_key")
)
.selectExpr("wrapped_key.*") // <== want to avoid this step
我的问题是:有没有不 Package 模式的方法?
jupyter中的完整代码:
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.avro.to_avro
import org.apache.spark.sql.avro.from_avro
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
val jsons = Seq(
"""{"key": {"id": "8166"}, "value": {"timestamp": 1596789012}}""",
"""{"key": {"id": "8167"}, "value": {"timestamp": 1596789123}}"""
)
val rdd = spark.sparkContext.parallelize(jsons)
val df = spark.read.json(rdd)
jsons = List({"key": {"id": "8166"}, "value": {"timestamp": 1596789012}}, {"key": {"id": "8167"}, "value": {"timestamp": 1596789123}})
rdd = ParallelCollectionRDD[0] at parallelize at <console>:35
df = [key: struct<id: string>, value: struct<timestamp: bigint>]
[key: struct<id: string>, value: struct<timestamp: bigint>]
val encoded = df
.select(
to_avro($"key").as("key")
)
encoded = [key: binary]
[key: binary]
val keySparkSchema: StructField = df.schema("key")
keySparkSchema = StructField(key,StructType(StructField(id,StringType,true)),true)
StructField(key,StructType(StructField(id,StringType,true)),true)
val keyAvroSchema = SchemaConverters.toAvroType(
StructType( // ???
Array( // ???
keySparkSchema
)
)
).toString()
keyAvroSchema = {"type":"record","name":"topLevelRecord","fields":[{"name":"key","type":[{"type":"record","name":"key","namespace":"topLevelRecord","fields":[{"name":"id","type":["string","null"]}]},"null"]}]}
{"type":"record","name":"topLevelRecord","fields":[{"name":"key","type":[{"type":"record","name":"key","namespace":"topLevelRecord","fields":[{"name":"id","type":["string","null"]}]},"null"]}]}
val decoded = encoded
.select(
from_avro($"key", keyAvroSchema).as("wrapped_key")
)
.selectExpr("wrapped_key.*") // <== want to avoid this step
decoded = [key: struct<id: string>]
[key: struct<id: string>]
decoded.printSchema
root
|-- key: struct (nullable = true)
| |-- id: string (nullable = true)
decoded.show()
+------+
| key|
+------+
|[8166]|
|[8167]|
+------+
decoded.selectExpr("key.*").show
+----+
| id|
+----+
|8166|
|8167|
+----+
暂无答案!
目前还没有任何答案,快来回答吧!