如何在spark中对具有不同列数的两个Dataframe执行并集?

qacovj5a  于 2021-07-09  发布在  Spark
关注(0)|答案(22)|浏览(636)

我有两个 DataFrame 学生:

我需要这样的结合:

这个 unionAll 函数不起作用,因为列的编号和名称不同。
我该怎么做?

mklgxw1f

mklgxw1f1#

Pypark公司
来自阿尔贝托的scala版本非常好用。然而,如果你想做一个for循环或者一些变量的动态赋值,你可能会遇到一些问题。解决方案附带pyspark-clean代码:

from pyspark.sql.functions import *

# defining dataframes

df1 = spark.createDataFrame(
    [
        (1, 'foo','ok'), 
        (2, 'pro','ok')
    ],
    ['id', 'txt','check']
)

df2 = spark.createDataFrame(
    [
        (3, 'yep',13,'mo'), 
        (4, 'bro',11,'re')

    ],
    ['id', 'txt','value','more'] 
) 

# retrieving columns

cols1 = df1.columns
cols2 = df2.columns

# getting columns from df1 and df2

total = list(set(cols2) | set(cols1)) 

# defining function for adding nulls (None in case of pyspark)

def addnulls(yourDF): 
  for x in total:
    if not x in yourDF.columns:
      yourDF = yourDF.withColumn(x,lit(None))
  return yourDF

df1 = addnulls(df1)
df2 = addnulls(df2)

# additional sorting for correct unionAll (it concatenates DFs by column number)

df1.select(sorted(df1.columns)).unionAll(df2.select(sorted(df2.columns))).show()

+-----+---+----+---+-----+
|check| id|more|txt|value|
+-----+---+----+---+-----+
|   ok|  1|null|foo| null|
|   ok|  2|null|pro| null|
| null|  3|  mo|yep|   13|
| null|  4|  re|bro|   11|
+-----+---+----+---+-----+
vptzau2j

vptzau2j2#

这是Pypark的解决方案。
它假设 df1 中缺少 df2 ,然后将缺少的字段添加到 df2 具有空值。但是,它也假设如果字段存在于两个Dataframe中,但是字段的类型或可空性不同,那么这两个Dataframe冲突并且不能合并。如果那样的话,我会提出 TypeError .

from pyspark.sql.functions import lit

def harmonize_schemas_and_combine(df_left, df_right):
    left_types = {f.name: f.dataType for f in df_left.schema}
    right_types = {f.name: f.dataType for f in df_right.schema}
    left_fields = set((f.name, f.dataType, f.nullable) for f in df_left.schema)
    right_fields = set((f.name, f.dataType, f.nullable) for f in df_right.schema)

    # First go over left-unique fields
    for l_name, l_type, l_nullable in left_fields.difference(right_fields):
        if l_name in right_types:
            r_type = right_types[l_name]
            if l_type != r_type:
                raise TypeError, "Union failed. Type conflict on field %s. left type %s, right type %s" % (l_name, l_type, r_type)
            else:
                raise TypeError, "Union failed. Nullability conflict on field %s. left nullable %s, right nullable %s"  % (l_name, l_nullable, not(l_nullable))
        df_right = df_right.withColumn(l_name, lit(None).cast(l_type))

    # Now go over right-unique fields
    for r_name, r_type, r_nullable in right_fields.difference(left_fields):
        if r_name in left_types:
            l_type = left_types[r_name]
            if r_type != l_type:
                raise TypeError, "Union failed. Type conflict on field %s. right type %s, left type %s" % (r_name, r_type, l_type)
            else:
                raise TypeError, "Union failed. Nullability conflict on field %s. right nullable %s, left nullable %s" % (r_name, r_nullable, not(r_nullable))
        df_left = df_left.withColumn(r_name, lit(None).cast(r_type))    

    # Make sure columns are in the same order
    df_left = df_left.select(df_right.columns)

    return df_left.union(df_right)
bbuxkriu

bbuxkriu3#

我的java版本:

private static Dataset<Row> unionDatasets(Dataset<Row> one, Dataset<Row> another) {
        StructType firstSchema = one.schema();
        List<String> anotherFields = Arrays.asList(another.schema().fieldNames());
        another = balanceDataset(another, firstSchema, anotherFields);
        StructType secondSchema = another.schema();
        List<String> oneFields = Arrays.asList(one.schema().fieldNames());
        one = balanceDataset(one, secondSchema, oneFields);
        return another.unionByName(one);
    }

    private static Dataset<Row> balanceDataset(Dataset<Row> dataset, StructType schema, List<String> fields) {
        for (StructField e : schema.fields()) {
            if (!fields.contains(e.name())) {
                dataset = dataset
                        .withColumn(e.name(),
                                lit(null));
                dataset = dataset.withColumn(e.name(),
                        dataset.col(e.name()).cast(Optional.ofNullable(e.dataType()).orElse(StringType)));
            }
        }
        return dataset;
    }
z5btuh9x

z5btuh9x4#

在scala中,您只需将所有缺少的列追加为 nulls .

import org.apache.spark.sql.functions._

// let df1 and df2 the Dataframes to merge
val df1 = sc.parallelize(List(
  (50, 2),
  (34, 4)
)).toDF("age", "children")

val df2 = sc.parallelize(List(
  (26, true, 60000.00),
  (32, false, 35000.00)
)).toDF("age", "education", "income")

val cols1 = df1.columns.toSet
val cols2 = df2.columns.toSet
val total = cols1 ++ cols2 // union

def expr(myCols: Set[String], allCols: Set[String]) = {
  allCols.toList.map(x => x match {
    case x if myCols.contains(x) => col(x)
    case _ => lit(null).as(x)
  })
}

df1.select(expr(cols1, total):_*).unionAll(df2.select(expr(cols2, total):_*)).show()

+---+--------+---------+-------+
|age|children|education| income|
+---+--------+---------+-------+
| 50|       2|     null|   null|
| 34|       4|     null|   null|
| 26|    null|     true|60000.0|
| 32|    null|    false|35000.0|
+---+--------+---------+-------+

更新

都是暂时的 DataFrames 将具有相同的列顺序,因为我们正在通过 total 在这两种情况下。

df1.select(expr(cols1, total):_*).show()
df2.select(expr(cols2, total):_*).show()

+---+--------+---------+------+
|age|children|education|income|
+---+--------+---------+------+
| 50|       2|     null|  null|
| 34|       4|     null|  null|
+---+--------+---------+------+

+---+--------+---------+-------+
|age|children|education| income|
+---+--------+---------+-------+
| 26|    null|     true|60000.0|
| 32|    null|    false|35000.0|
+---+--------+---------+-------+
g6ll5ycj

g6ll5ycj5#

我也遇到了同样的问题,用join代替union解决了我的问题。因此,例如使用python,而不是这行代码: result = left.union(right) ,对于不同的列数将无法执行,您应使用以下命令:

result = left.join(right, left.columns if (len(left.columns) < len(right.columns)) else right.columns, "outer")

注意,第二个参数包含两个Dataframe之间的公共列。如果不使用它,结果将有重复的列,其中一个为null,另一个不为null。希望有帮助。

blpfk2vs

blpfk2vs6#

在Pypark中:

df = df1.join(df2, ['each', 'shared', 'col'], how='full')
mrphzbgm

mrphzbgm7#

pysparkDataframe连接的并集和外并集。这适用于具有不同列的多个Dataframe。

def union_all(*dfs):
    return reduce(ps.sql.DataFrame.unionAll, dfs)

def outer_union_all(*dfs):

    all_cols = set([])
    for df in dfs:
        all_cols |= set(df.columns) 
    all_cols = list(all_cols)
    print(all_cols)

    def expr(cols, all_cols):

        def append_cols(col):
            if col in cols:
                return col
            else:
                return sqlfunc.lit(None).alias(col)

        cols_ = map(append_cols, all_cols)
        return list(cols_)

    union_df = union_all(*[df.select(expr(df.columns, all_cols)) for df in dfs])
    return union_df
uurity8g

uurity8g8#

这是我的python版本:

from pyspark.sql import SparkSession, HiveContext
from pyspark.sql.functions import lit
from pyspark.sql import Row

def customUnion(df1, df2):
    cols1 = df1.columns
    cols2 = df2.columns
    total_cols = sorted(cols1 + list(set(cols2) - set(cols1)))
    def expr(mycols, allcols):
        def processCols(colname):
            if colname in mycols:
                return colname
            else:
                return lit(None).alias(colname)
        cols = map(processCols, allcols)
        return list(cols)
    appended = df1.select(expr(cols1, total_cols)).union(df2.select(expr(cols2, total_cols)))
    return appended

以下是示例用法:

data = [
    Row(zip_code=58542, dma='MIN'),
    Row(zip_code=58701, dma='MIN'),
    Row(zip_code=57632, dma='MIN'),
    Row(zip_code=58734, dma='MIN')
]

firstDF = spark.createDataFrame(data)

data = [
    Row(zip_code='534', name='MIN'),
    Row(zip_code='353', name='MIN'),
    Row(zip_code='134', name='MIN'),
    Row(zip_code='245', name='MIN')
]

secondDF = spark.createDataFrame(data)

customUnion(firstDF,secondDF).show()
jslywgbw

jslywgbw9#

或者你可以使用完全连接。

list_of_files = ['test1.parquet', 'test2.parquet']

def merged_frames():
  if list_of_files:
    frames = [spark.read.parquet(df.path) for df in list_of_files]
    if frames:
      df = frames[0]
      if frames[1]:
        var = 1
        for element in range(len(frames)-1):
          result_df = df.join(frames[var], 'primary_key', how='full')
          var += 1
    display(result_df)
yi0zb3m4

yi0zb3m410#

Spark3.1+

df = df1.unionByName(df2, allowMissingColumns=True)

测试结果:

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

data1=[
(1 , '2016-08-29', 1 , 2, 3),
(2 , '2016-08-29', 1 , 2, 3),
(3 , '2016-08-29', 1 , 2, 3)]
df1 = spark.createDataFrame(data1, ['code' , 'date' , 'A' , 'B', 'C'])
data2=[
(5 , '2016-08-29', 1, 2, 3, 4),
(6 , '2016-08-29', 1, 2, 3, 4),
(7 , '2016-08-29', 1, 2, 3, 4)]
df2 = spark.createDataFrame(data2, ['code' , 'date' , 'B', 'C', 'D', 'E'])

df = df1.unionByName(df2, allowMissingColumns=True)
df.show()

# +----+----------+----+---+---+----+----+

# |code|      date|   A|  B|  C|   D|   E|

# +----+----------+----+---+---+----+----+

# |   1|2016-08-29|   1|  2|  3|null|null|

# |   2|2016-08-29|   1|  2|  3|null|null|

# |   3|2016-08-29|   1|  2|  3|null|null|

# |   5|2016-08-29|null|  1|  2|   3|   4|

# |   6|2016-08-29|null|  1|  2|   3|   4|

# |   7|2016-08-29|null|  1|  2|   3|   4|

# +----+----------+----+---+---+----+----+
1szpjjfi

1szpjjfi11#

这是我的Pypark版本:

from functools import reduce
from pyspark.sql.functions import lit

def concat(dfs):
    # when the dataframes to combine do not have the same order of columns
    # https://datascience.stackexchange.com/a/27231/15325
    return reduce(lambda df1, df2: df1.union(df2.select(df1.columns)), dfs) 

def union_all(dfs):
    columns = reduce(lambda x, y : set(x).union(set(y)), [ i.columns for i in dfs ]  )

    for i in range(len(dfs)):
        d = dfs[i]
        for c in columns:
            if c not in d.columns:
                d = d.withColumn(c, lit(None))
        dfs[i] = d

    return concat(dfs)
1wnzp6jl

1wnzp6jl12#

这是scala的版本,也是pyspark的版本spark-将具有不同模式(列名和序列)的Dataframe合并/联合到具有主公共模式的Dataframe-
需要Dataframe列表才能联合。。所有dataframe中提供的相同命名列应具有相同的数据类型。。

def unionPro(DFList: List[DataFrame], spark: org.apache.spark.sql.SparkSession): DataFrame = {

    /**
     * This Function Accepts DataFrame with same or Different Schema/Column Order.With some or none common columns
     * Creates a Unioned DataFrame
     */

    import spark.implicits._

    val MasterColList: Array[String] = DFList.map(_.columns).reduce((x, y) => (x.union(y))).distinct

    def unionExpr(myCols: Seq[String], allCols: Seq[String]): Seq[org.apache.spark.sql.Column] = {
      allCols.toList.map(x => x match {
        case x if myCols.contains(x) => col(x)
        case _                       => lit(null).as(x)
      })
    }

    // Create EmptyDF , ignoring different Datatype in StructField and treating them same based on Name ignoring cases

    val masterSchema = StructType(DFList.map(_.schema.fields).reduce((x, y) => (x.union(y))).groupBy(_.name.toUpperCase).map(_._2.head).toArray)

    val masterEmptyDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], masterSchema).select(MasterColList.head, MasterColList.tail: _*)

    DFList.map(df => df.select(unionExpr(df.columns, MasterColList): _*)).foldLeft(masterEmptyDF)((x, y) => x.union(y))

  }

这是它的样品测试-

val aDF = Seq(("A", 1), ("B", 2)).toDF("Name", "ID")
    val bDF = Seq(("C", 1, "D1"), ("D", 2, "D2")).toDF("Name", "Sal", "Deptt")
    unionPro(List(aDF, bDF), spark).show

输出为-

+----+----+----+-----+
|Name|  ID| Sal|Deptt|
+----+----+----+-----+
|   A|   1|null| null|
|   B|   2|null| null|
|   C|null|   1|   D1|
|   D|null|   2|   D2|
+----+----+----+-----+
plicqrtu

plicqrtu13#

一个更通用的方法来联合 DataFrame .

def unionFrames(dfs: Seq[DataFrame]): DataFrame = {
    dfs match {
      case Nil => session.emptyDataFrame // or throw an exception?
      case x :: Nil => x
      case _ =>
        //Preserving Column order from left to right DF's column order
        val allColumns = dfs.foldLeft(collection.mutable.ArrayBuffer.empty[String])((a, b) => a ++ b.columns).distinct

        val appendMissingColumns = (df: DataFrame) => {
          val columns = df.columns.toSet
          df.select(allColumns.map(c => if (columns.contains(c)) col(c) else lit(null).as(c)): _*)
        }

        dfs.tail.foldLeft(appendMissingColumns(dfs.head))((a, b) => a.union(appendMissingColumns(b)))
    }
5sxhfpxr

5sxhfpxr14#

还有一个:

def unite(df1: DataFrame, df2: DataFrame): DataFrame = {
    val cols1 = df1.columns.toSet
    val cols2 = df2.columns.toSet
    val total = (cols1 ++ cols2).toSeq.sorted
    val expr1 = total.map(c => {
      if (cols1.contains(c)) c else "NULL as " + c
    })
    val expr2 = total.map(c => {
      if (cols2.contains(c)) c else "NULL as " + c
    })
    df1.selectExpr(expr1:_*).union(
      df2.selectExpr(expr2:_*)
    )
}
vatpfxk5

vatpfxk515#

from functools import reduce
from pyspark.sql import DataFrame
import pyspark.sql.functions as F

def unionAll(*dfs, fill_by=None):
    clmns = {clm.name.lower(): (clm.dataType, clm.name) for df in dfs for clm in df.schema.fields}

    dfs = list(dfs)
    for i, df in enumerate(dfs):
        df_clmns = [clm.lower() for clm in df.columns]
        for clm, (dataType, name) in clmns.items():
            if clm not in df_clmns:
                # Add the missing column
                dfs[i] = dfs[i].withColumn(name, F.lit(fill_by).cast(dataType))
    return reduce(DataFrame.unionByName, dfs)
unionAll(df1, df2).show()

示例列中的大小写
将返回实际的列大小写
支持现有数据类型
默认值可以自定义
一次传递多个Dataframe(例如unionall(df1,df2,df3,…,df10))

相关问题