在pyspark中有没有一种方法可以将一个schema复制到另一个dataframe?

vulvrdjw  于 2023-04-19  发布在  Spark
关注(0)|答案(2)|浏览(135)

我有一个Spark Dataframe (df1)具有特定模式,并且我有另一个具有相同列但不同模式的dataframe。我知道如何逐列执行,但由于我有一个大型列集,因此它将非常冗长。为了保持跨dataframe的模式一致性,我想知道我是否可以将一个模式应用于数据框或创建一个函数来完成这项工作。

df1
root
|-- A: date (nullable = true)
 |-- B: integer (nullable = true)
 |-- C: string (nullable = true)

df2
root
 |-- A: string (nullable = true)
 |-- B: string (nullable = true)
 |-- C: string (nullable = true)`

我想复制应用模式的df1到df2。我会很感激任何commnets和方向。
我在一个专栏中尝试了这种方法,因为我有大量的专栏,这将是一种相当冗长的方法。

df2 = df2.withColumn("B", df2["B"].cast('int'))
5fjcxozz

5fjcxozz1#

是的,可以动态使用**dataframe.schema.fields**
df2.select(*[(col(x.name).cast(x.dataType)) for x in df1.schema.fields])

Example:

from pyspark.sql.functions import *
df1 = spark.createDataFrame([('2022-02-02',2,'a')],['A','B','C']).withColumn("A",to_date(col("A")))
print("df1 Schema")
df1.printSchema()
#df1 Schema
#root
# |-- A: date (nullable = true)
# |-- B: long (nullable = true)
# |-- C: string (nullable = true)

df2 = spark.createDataFrame([('2022-02-02','2','a')],['A','B','C'])
print("df2 Schema")
df2.printSchema()
#df2 Schema
#root
# |-- A: string (nullable = true)
# |-- B: string (nullable = true)
# |-- C: string (nullable = true)
#

#casting the df2 columns by getting df1 schema using select clause
df3 = df2.select(*[(col(x.name).cast(x.dataType)) for x in df1.schema.fields])
df3.show(10,False)
print("df3 Schema")
df3.printSchema()

#+----------+---+---+
#|A         |B  |C  |
#+----------+---+---+
#|2022-02-02|2  |a  |
#+----------+---+---+

#df3 Schema
#root
# |-- A: date (nullable = true)
# |-- B: long (nullable = true)
# |-- C: string (nullable = true)

在这个例子中,我用Integer,date,long types定义了df1

**df2**定义为string类型。
**df3**是以df2为源数据,附带df1 schema定义的。

xqkwcwgp

xqkwcwgp2#

试试这个

输入数据框

from pyspark.sql.functions import *
from pyspark.sql.types import *

from datetime import datetime

data1 = [("2022-01-01", 1, "A"),
         ("2022-01-02", 2, "B"),
         ("2022-01-03", 3, "C")
        ]

data1 = [(datetime.strptime(date_str, "%Y-%m-%d"), b, c) for date_str, b, c in data1]

schema1 = StructType([StructField("A", DateType(), True),
                      StructField("B", IntegerType(), True),
                      StructField("C", StringType(), True)
                     ]
                    )

df1 = spark.createDataFrame(data1, schema=schema1)

df1.printSchema()

data2 = [("2022-01-04", "4", "D"),
         ("2022-01-05", "5", "E"),
         ("2022-01-06", "6", "F")
        ]
schema2 = StructType([StructField("A", StringType(), True),
                      StructField("B", StringType(), True),
                      StructField("C", StringType(), True)
                     ]
                    )
df2 = spark.createDataFrame(data2, schema=schema2)

df2.printSchema()
df2 = spark.createDataFrame(data=df2.rdd,schema=df1.schema)
df2.printSchema()

root
 |-- A: date (nullable = true)
 |-- B: integer (nullable = true)
 |-- C: string (nullable = true)

或者,你创建一个方法,你需要一个更通用的解决方案-

def apply_schema(df1, df2):
    schema1 = df1.schema
    
    schema2 = df2.schema
    
    data_types = {field.name: field.dataType for field in schema1.fields}
    
    for field in schema2.fields:
        column_name = field.name
        
        if column_name in data_types:
            column_type = data_types[column_name]
            df2 = df2.withColumn(column_name, df2[column_name].cast(column_type))
    
    return df2

并使用此方法将df 1的模式强加到df 2上-

df2 = apply_schema(df1, df2)

print("Schema of df1:")
df1.printSchema()

print("Schema of df2:")
df2.printSchema()

df2.show()

Schema of df1:
root
 |-- A: date (nullable = true)
 |-- B: integer (nullable = true)
 |-- C: string (nullable = true)

Schema of df2:
root
 |-- A: date (nullable = true)
 |-- B: integer (nullable = true)
 |-- C: string (nullable = true)

+----------+---+---+
|         A|  B|  C|
+----------+---+---+
|2022-01-04|  4|  D|
|2022-01-05|  5|  E|
|2022-01-06|  6|  F|
+----------+---+---+

相关问题