在Pyspark中合并2个字符串

8i9zcol2  于 12个月前  发布在  Spark
关注(0)|答案(2)|浏览(189)

在Pyspark中,我有2个框架。第一个框架说df1是从模式创建的空框架。第二个框架df2是从csv文件填充的非空框架。现在我想合并,以便涵盖所有以下场景。
1.如果两个嵌套框包含相同的if列数,则将它们合并。
1.如果第二个框架包含其他列,则删除这些列
1.如果第二个框架包含较小的列,则使用空值填充这些列。
我试着遍历空的框架列的field属性。schema =一些列的模式

for field in scehma.fields:
    if field.name in df2.columns:
       final_df = df1.withColumn(field.name, df2[field.name].cast(field.dataType))

字符串

8mmmxcuj

8mmmxcuj1#

基本上,您有一个包含A、B和C列的输入架构和一个包含A、B和D列的引用架构。您希望输出架构通过删除C列并添加“NULL”列D来匹配此“引用”架构。
假设df_ref是你的参考框架,df是你的csv文件。

from pyspark.sql import functions as F

# Adding missing columns
for col in df_ref.columns: 
    if col not in df.columns:
        df = df.withColumn(col, F.lit(None))

# select only ref's columns
df.select(df_ref.columns)

字符串

xjreopfe

xjreopfe2#

你可以在PySpark中使用join操作来实现所需的合并。下面是一个示例代码片段,涵盖了你提到的所有场景:

from pyspark.sql.functions import lit

# Check if both dataframes have the same number of columns
if len(df1.columns) == len(df2.columns):
    # Merge the dataframes using a join operation
    final_df = df1.join(df2, on=df1.columns, how='inner')

# Check if the 2nd dataframe has additional columns
elif len(df1.columns) < len(df2.columns):
    # Drop the additional columns from the 2nd dataframe
    common_columns = set(df1.columns).intersection(df2.columns)
    final_df = df2.select(*common_columns)

# Check if the 2nd dataframe has fewer columns
elif len(df1.columns) > len(df2.columns):
    # Add null columns to the 2nd dataframe for the missing columns
    missing_columns = set(df1.columns).difference(df2.columns)
    for column in missing_columns:
        df2 = df2.withColumn(column, lit(None))

    final_df = df1.join(df2, on=df1.columns, how='inner')

字符串

相关问题