from pyspark.sql.functions import lit
def harmonize_schemas_and_combine(df_left, df_right):
left_types = {f.name: f.dataType for f in df_left.schema}
right_types = {f.name: f.dataType for f in df_right.schema}
left_fields = set((f.name, f.dataType, f.nullable) for f in df_left.schema)
right_fields = set((f.name, f.dataType, f.nullable) for f in df_right.schema)
# First go over left-unique fields
for l_name, l_type, l_nullable in left_fields.difference(right_fields):
if l_name in right_types:
r_type = right_types[l_name]
if l_type != r_type:
raise TypeError, "Union failed. Type conflict on field %s. left type %s, right type %s" % (l_name, l_type, r_type)
else:
raise TypeError, "Union failed. Nullability conflict on field %s. left nullable %s, right nullable %s" % (l_name, l_nullable, not(l_nullable))
df_right = df_right.withColumn(l_name, lit(None).cast(l_type))
# Now go over right-unique fields
for r_name, r_type, r_nullable in right_fields.difference(left_fields):
if r_name in left_types:
l_type = left_types[r_name]
if r_type != l_type:
raise TypeError, "Union failed. Type conflict on field %s. right type %s, left type %s" % (r_name, r_type, l_type)
else:
raise TypeError, "Union failed. Nullability conflict on field %s. right nullable %s, left nullable %s" % (r_name, r_nullable, not(r_nullable))
df_left = df_left.withColumn(r_name, lit(None).cast(r_type))
# Make sure columns are in the same order
df_left = df_left.select(df_right.columns)
return df_left.union(df_right)
list_of_files = ['test1.parquet', 'test2.parquet']
def merged_frames():
if list_of_files:
frames = [spark.read.parquet(df.path) for df in list_of_files]
if frames:
df = frames[0]
if frames[1]:
var = 1
for element in range(len(frames)-1):
result_df = df.join(frames[var], 'primary_key', how='full')
var += 1
display(result_df)
from functools import reduce
from pyspark.sql.functions import lit
def concat(dfs):
# when the dataframes to combine do not have the same order of columns
# https://datascience.stackexchange.com/a/27231/15325
return reduce(lambda df1, df2: df1.union(df2.select(df1.columns)), dfs)
def union_all(dfs):
columns = reduce(lambda x, y : set(x).union(set(y)), [ i.columns for i in dfs ] )
for i in range(len(dfs)):
d = dfs[i]
for c in columns:
if c not in d.columns:
d = d.withColumn(c, lit(None))
dfs[i] = d
return concat(dfs)
def unionPro(DFList: List[DataFrame], spark: org.apache.spark.sql.SparkSession): DataFrame = {
/**
* This Function Accepts DataFrame with same or Different Schema/Column Order.With some or none common columns
* Creates a Unioned DataFrame
*/
import spark.implicits._
val MasterColList: Array[String] = DFList.map(_.columns).reduce((x, y) => (x.union(y))).distinct
def unionExpr(myCols: Seq[String], allCols: Seq[String]): Seq[org.apache.spark.sql.Column] = {
allCols.toList.map(x => x match {
case x if myCols.contains(x) => col(x)
case _ => lit(null).as(x)
})
}
// Create EmptyDF , ignoring different Datatype in StructField and treating them same based on Name ignoring cases
val masterSchema = StructType(DFList.map(_.schema.fields).reduce((x, y) => (x.union(y))).groupBy(_.name.toUpperCase).map(_._2.head).toArray)
val masterEmptyDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], masterSchema).select(MasterColList.head, MasterColList.tail: _*)
DFList.map(df => df.select(unionExpr(df.columns, MasterColList): _*)).foldLeft(masterEmptyDF)((x, y) => x.union(y))
}
def unionFrames(dfs: Seq[DataFrame]): DataFrame = {
dfs match {
case Nil => session.emptyDataFrame // or throw an exception?
case x :: Nil => x
case _ =>
//Preserving Column order from left to right DF's column order
val allColumns = dfs.foldLeft(collection.mutable.ArrayBuffer.empty[String])((a, b) => a ++ b.columns).distinct
val appendMissingColumns = (df: DataFrame) => {
val columns = df.columns.toSet
df.select(allColumns.map(c => if (columns.contains(c)) col(c) else lit(null).as(c)): _*)
}
dfs.tail.foldLeft(appendMissingColumns(dfs.head))((a, b) => a.union(appendMissingColumns(b)))
}
def unite(df1: DataFrame, df2: DataFrame): DataFrame = {
val cols1 = df1.columns.toSet
val cols2 = df2.columns.toSet
val total = (cols1 ++ cols2).toSeq.sorted
val expr1 = total.map(c => {
if (cols1.contains(c)) c else "NULL as " + c
})
val expr2 = total.map(c => {
if (cols2.contains(c)) c else "NULL as " + c
})
df1.selectExpr(expr1:_*).union(
df2.selectExpr(expr2:_*)
)
}
from functools import reduce
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
def unionAll(*dfs, fill_by=None):
clmns = {clm.name.lower(): (clm.dataType, clm.name) for df in dfs for clm in df.schema.fields}
dfs = list(dfs)
for i, df in enumerate(dfs):
df_clmns = [clm.lower() for clm in df.columns]
for clm, (dataType, name) in clmns.items():
if clm not in df_clmns:
# Add the missing column
dfs[i] = dfs[i].withColumn(name, F.lit(fill_by).cast(dataType))
return reduce(DataFrame.unionByName, dfs)
unionAll(df1, df2).show()
22条答案
按热度按时间mklgxw1f1#
Pypark公司
来自阿尔贝托的scala版本非常好用。然而,如果你想做一个for循环或者一些变量的动态赋值,你可能会遇到一些问题。解决方案附带pyspark-clean代码:
vptzau2j2#
这是Pypark的解决方案。
它假设
df1
中缺少df2
,然后将缺少的字段添加到df2
具有空值。但是,它也假设如果字段存在于两个Dataframe中,但是字段的类型或可空性不同,那么这两个Dataframe冲突并且不能合并。如果那样的话,我会提出TypeError
.bbuxkriu3#
我的java版本:
z5btuh9x4#
在scala中,您只需将所有缺少的列追加为
nulls
.更新
都是暂时的
DataFrames
将具有相同的列顺序,因为我们正在通过total
在这两种情况下。g6ll5ycj5#
我也遇到了同样的问题,用join代替union解决了我的问题。因此,例如使用python,而不是这行代码:
result = left.union(right)
,对于不同的列数将无法执行,您应使用以下命令:注意,第二个参数包含两个Dataframe之间的公共列。如果不使用它,结果将有重复的列,其中一个为null,另一个不为null。希望有帮助。
blpfk2vs6#
在Pypark中:
mrphzbgm7#
pysparkDataframe连接的并集和外并集。这适用于具有不同列的多个Dataframe。
uurity8g8#
这是我的python版本:
以下是示例用法:
jslywgbw9#
或者你可以使用完全连接。
yi0zb3m410#
Spark3.1+
测试结果:
1szpjjfi11#
这是我的Pypark版本:
1wnzp6jl12#
这是scala的版本,也是pyspark的版本spark-将具有不同模式(列名和序列)的Dataframe合并/联合到具有主公共模式的Dataframe-
需要Dataframe列表才能联合。。所有dataframe中提供的相同命名列应具有相同的数据类型。。
这是它的样品测试-
输出为-
plicqrtu13#
一个更通用的方法来联合
DataFrame
.5sxhfpxr14#
还有一个:
vatpfxk515#
示例列中的大小写
将返回实际的列大小写
支持现有数据类型
默认值可以自定义
一次传递多个Dataframe(例如unionall(df1,df2,df3,…,df10))