PySpark:从DataSchema获取列数

3phpmpom  于 2023-10-15  发布在  Spark
关注(0)|答案(1)|浏览(108)

我正在从JSON文件中加载一个预定义的模式,用于我摄取到Azure数据湖中的特定数据集。包含模式的JSON文件也存储在数据湖上。

varSchema = 'abfss://landing@[hidden].dfs.core.windows.net/'+parSourceSystemName+'/'+parDatasetName+'.json'

rdd = spark.sparkContext.wholeTextFiles(varSchema)
text = rdd.collect()[0][1]
dict = json.loads(str(text))
dataSchema = StructType.fromJson(dict)

我想获取这个模式变量中的number字段,这样我就可以将它与从着陆容器中的文件加载的子框架的number列进行比较,以确定新着陆数据中是否有模式更改。
如果Schema声明应该有20个字段,但着陆数据文件包含21个-我会知道源系统添加了一个新字段。

qzwqbdag

qzwqbdag1#

varSchema = 'abfss://landing@[hidden].dfs.core.windows.net/'+parSourceSystemName+'/'+parDatasetName+'.json'
rdd = spark.sparkContext.wholeTextFiles(varSchema)
text = rdd.collect()[0][1]
schema_dict = json.loads(text)
dataSchema = StructType.fromJson(schema_dict)

使用模式创建一个空DataFrame

empty_DF = spark.createDataFrame([], dataSchema)

将实际数据加载到另一个DF

landing_data_DF = spark.read.format("csv").load("abfss://landing@[hidden].dfs.core.windows.net/path/to/datafile.csv")

获取schema中的字段数和着陆数据DF中的列数,并比较它们:(我假设您需要打印语句)

num_fields_in_schema = len(dataSchema)

num_columns_in_data = len(landing_data_DF.columns)

# I used print statements but you can do other methods according to your preference

if num_fields_in_schema == num_columns_in_data:
    print("No schema change.")
elif num_fields_in_schema < num_columns_in_data:
    print("The source system added new fields.")
else:
    print("The source system removed fields.")

相关问题