Spark union所有多个 Dataframe

6yoyoihd  于 2023-02-19  发布在  Apache
关注(0)|答案(4)|浏览(191)

对于一组 Dataframe

val df1 = sc.parallelize(1 to 4).map(i => (i,i*10)).toDF("id","x")
val df2 = sc.parallelize(1 to 4).map(i => (i,i*100)).toDF("id","y")
val df3 = sc.parallelize(1 to 4).map(i => (i,i*1000)).toDF("id","z")

把他们联合起来我愿意

df1.unionAll(df2).unionAll(df3)

对于任意数量的 Dataframe (例如来自

Seq(df1, df2, df3)
tgabmvqs

tgabmvqs1#

对于pyspark,您可以执行以下操作:

from functools import reduce
from pyspark.sql import DataFrame

dfs = [df1,df2,df3]
df = reduce(DataFrame.unionAll, dfs)

** Dataframe 中的列顺序应该相同才能正常工作,这也毫无意义。如果列顺序不正确,这可能会给予意外结果!!**

如果使用的是pyspark 2.3或更高版本,则可以使用unionByName,这样就不必重新排序列。

kninwzqo

kninwzqo2#

最简单的解决方案是reduceunion(Spark中的unionAll〈2.0):

val dfs = Seq(df1, df2, df3)
dfs.reduce(_ union _)

这是相对简洁的,不应该从堆外存储移动数据,但扩展每个并集的世系需要非线性时间来执行计划分析。如果您试图合并大量的DataFrames,会出现什么问题?
您也可以转换为RDDs并使用SparkContext.union

dfs match {
  case h :: Nil => Some(h)
  case h :: _   => Some(h.sqlContext.createDataFrame(
                     h.sqlContext.sparkContext.union(dfs.map(_.rdd)),
                     h.schema
                   ))
  case Nil  => None
}

它保持谱系短分析成本低,但在其他方面比直接合并DataFrames效率低。

pxiryf3j

pxiryf3j3#

引擎盖下的Spark使并集表达式变平。所以当并集是线性完成时,它需要更长的时间。
最好的解决方案是spark拥有一个支持多个DataFrame的联合函数。
但是下面的代码可能会在某种程度上加快多个DataFrame(或DataSet)的联合。

def union[T : ClassTag](datasets : TraversableOnce[Dataset[T]]) : Dataset[T] = {
      binaryReduce[Dataset[T]](datasets, _.union(_))
  }
  def binaryReduce[T : ClassTag](ts : TraversableOnce[T], op: (T, T) => T) : T = {
      if (ts.isEmpty) {
         throw new IllegalArgumentException
      }
      var array = ts toArray
      var size = array.size
      while(size > 1) {
         val newSize = (size + 1) / 2
         for (i <- 0 until newSize) {
             val index = i*2
             val index2 = index + 1
             if (index2 >= size) {
                array(i) = array(index)  // last remaining
             } else {
                array(i) = op(array(index), array(index2))
             }
         }
         size = newSize
     }
     array(0)
 }
0lvr5msh

0lvr5msh4#

您可以通过将reduce与lambda一起使用来添加类似于allowMissingColumns的参数

from functools import reduce
from pyspark.sql import DataFrame

dfs = [df1, df2]
df = reduce(lambda x, y: x.unionByName(y, allowMissingColumns=True), dfs)

相关问题