我目前正在与aws胶水和pyspark。我正在尝试手动创建spark模式,并将其应用于dataframe,以解决某些列的一些问题。
我当前的问题是结构类型的列。我的数据在通过glue读取的json中只提供了6个字段,但将来可能有8个字段。如果我用我目前收到的6个字段构建模式,它可以正常工作,但是如果我用我应该得到的8个字段构建模式,我会得到以下错误:valueerror:field name\u struct:length of object(6)与length of fields(8)不匹配。
应用更新的模式的最佳方法是什么?我认为我应该对struct列应用一个函数,并用空值创建缺少的键,以便在我的数据中具有预期的8个键。你知道怎么做吗?如果这样做,我认为对于所有struct类型的列,我应该解析模式,比较列中的数据(asdict()?)和模式之间的键,最后添加缺少的键。我觉得有一个更好的方法来处理spark或者一些我还没有找到的参数来在创建Dataframe时自动修复它?
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
spark = SparkSession.builder.master("local[1]") \
.appName('test_schema') \
.getOrCreate()
data = [
(32, ("James", "Smith"), ("street", 42)),
(18, ("Nina", "Smith"), ("street 2", 12))
]
schema = StructType([
StructField('age', IntegerType(), True),
StructField('name_struct', StructType([
StructField('first_name', StringType(), True),
StructField('last_name', StringType(), True)
])),
StructField('address_struct', StructType([
StructField('street', StringType(), True),
StructField('number', IntegerType(), True)
]))
])
df = spark.createDataFrame(data=data, schema=schema)
df.printSchema()
df.show()
schema2 = StructType([
StructField('age', IntegerType(), True),
StructField('name_struct', StructType([
StructField('first_name', StringType(), True),
StructField('last_name', StringType(), True),
StructField('middle_name', StringType(), True)
])),
StructField('address_struct', StructType([
StructField('street', StringType(), True),
StructField('number', IntegerType(), True),
StructField('zip_code', IntegerType(), True)
]))
])
# fix data here?
df2 = spark.createDataFrame(data=data, schema=schema2)
# error here: ValueError: field name_struct: Length of object (2) does not match with length of fields (3)
df2.printSchema()
df2.show()
谢谢!
暂无答案!
目前还没有任何答案,快来回答吧!