pyspark:具有不同列的Dataframe的动态联合

umuewwlo  于 2021-07-14  发布在  Spark
关注(0)|答案(3)|浏览(369)

考虑如下所示的数组。我有3组数组:
阵列1:

  1. C1 C2 C3
  2. 1 2 3
  3. 9 5 6

阵列2:

  1. C2 C3 C4
  2. 11 12 13
  3. 10 15 16

阵列3:

  1. C1 C4
  2. 111 112
  3. 110 115

我需要的输出如下,输入我可以得到任何一个值的c1,…,c4,但当加入我需要得到正确的值,如果值不存在,那么它应该是零。
预期产量:

  1. C1 C2 C3 C4
  2. 1 2 3 0
  3. 9 5 6 0
  4. 0 11 12 13
  5. 0 10 15 16
  6. 111 0 0 112
  7. 110 0 0 115

我已经编写了pyspark代码,但是我已经硬编码了新列的值和它的原始值,我需要将下面的代码转换为方法重载,这样我就可以使用这个脚本作为自动脚本。我只需要使用python/pyspark而不是pandas。

  1. import pyspark
  2. from pyspark import SparkContext
  3. from pyspark.sql.functions import lit
  4. from pyspark.sql import SparkSession
  5. sqlContext = pyspark.SQLContext(pyspark.SparkContext())
  6. df01 = sqlContext.createDataFrame([(1, 2, 3), (9, 5, 6)], ("C1", "C2", "C3"))
  7. df02 = sqlContext.createDataFrame([(11,12, 13), (10, 15, 16)], ("C2", "C3", "C4"))
  8. df03 = sqlContext.createDataFrame([(111,112), (110, 115)], ("C1", "C4"))
  9. df01_add = df01.withColumn("C4", lit(0)).select("c1","c2","c3","c4")
  10. df02_add = df02.withColumn("C1", lit(0)).select("c1","c2","c3","c4")
  11. df03_add = df03.withColumn("C2", lit(0)).withColumn("C3", lit(0)).select("c1","c2","c3","c4")
  12. df_uni = df01_add.union(df02_add).union(df03_add)
  13. df_uni.show()

方法重载示例:

  1. class Student:
  2. def ___Init__ (self,m1,m2):
  3. self.m1 = m1
  4. self.m2 = m2
  5. def sum(self,c1=None,c2=None,c3=None,c4=None):
  6. s = 0
  7. if c1!= None and c2 != None and c3 != None:
  8. s = c1+c2+c3
  9. elif c1 != None and c2 != None:
  10. s = c1+c2
  11. else:
  12. s = c1
  13. return s
  14. print(s1.sum(55,65,23))
6qqygrtg

6qqygrtg1#

也许有很多更好的方法可以做到这一点,但也许下面的方法对将来的任何人都有用。

  1. from pyspark.sql import SparkSession
  2. from pyspark.sql.functions import lit
  3. spark = SparkSession.builder\
  4. .appName("DynamicFrame")\
  5. .getOrCreate()
  6. df01 = spark.createDataFrame([(1, 2, 3), (9, 5, 6)], ("C1", "C2", "C3"))
  7. df02 = spark.createDataFrame([(11,12, 13), (10, 15, 16)], ("C2", "C3", "C4"))
  8. df03 = spark.createDataFrame([(111,112), (110, 115)], ("C1", "C4"))
  9. dataframes = [df01, df02, df03]
  10. # Create a list of all the column names and sort them
  11. cols = set()
  12. for df in dataframes:
  13. for x in df.columns:
  14. cols.add(x)
  15. cols = sorted(cols)
  16. # Create a dictionary with all the dataframes
  17. dfs = {}
  18. for i, d in enumerate(dataframes):
  19. new_name = 'df' + str(i) # New name for the key, the dataframe is the value
  20. dfs[new_name] = d
  21. # Loop through all column names. Add the missing columns to the dataframe (with value 0)
  22. for x in cols:
  23. if x not in d.columns:
  24. dfs[new_name] = dfs[new_name].withColumn(x, lit(0))
  25. dfs[new_name] = dfs[new_name].select(cols) # Use 'select' to get the columns sorted
  26. # Now put it al together with a loop (union)
  27. result = dfs['df0'] # Take the first dataframe, add the others to it
  28. dfs_to_add = dfs.keys() # List of all the dataframes in the dictionary
  29. dfs_to_add.remove('df0') # Remove the first one, because it is already in the result
  30. for x in dfs_to_add:
  31. result = result.union(dfs[x])
  32. result.show()

输出:

  1. +---+---+---+---+
  2. | C1| C2| C3| C4|
  3. +---+---+---+---+
  4. | 1| 2| 3| 0|
  5. | 9| 5| 6| 0|
  6. | 0| 11| 12| 13|
  7. | 0| 10| 15| 16|
  8. |111| 0| 0|112|
  9. |110| 0| 0|115|
  10. +---+---+---+---+
展开查看全部
i5desfxk

i5desfxk2#

这是scala的版本-
https://stackoverflow.com/a/60702657/9445912
有问题的-
spark-将具有不同模式(列名和序列)的Dataframe合并/联合到具有主公共模式的Dataframe

xzlaal3s

xzlaal3s3#

我会努力的

  1. df = df1.join(df2, ['each', 'shared', 'col], how='full')

相关问题