无聚合的组中的spark pivot

yks3o0rb  于 2021-07-14  发布在  Spark
关注(0)|答案(2)|浏览(541)

我有这个Dataframe:
idnameq1\u wq1\u xq1\u yq1\u zq2\u xq2\u yq2\u z1AaaVal1Val2Val3Val4ValwValxValyValz2BBBDEL1DEL2DEL3DEL4DELXDELYDELZ3CCCCSOL1SOL2SOL3SOL4NULLNULL
正在尝试转换为此Dataframe:
IDnamewxyz1aaaaaaval1val2val3val41aaaavalwvalxvalyvalz2bbbdel1del2del3del42bbdelwdelxdelydelz3cccsol1sol2sol3sol4
什么样的Dataframe转换可以帮助我解决这个问题 WITHOUT 转换为rdd?

vzgqcmou

vzgqcmou1#

如果源Dataframe中的列数保持不变或相同,则只需执行两个单独的转换,即选择列并重命名,然后对两个Dataframe进行并集,就可以得到所需的输出。

  1. //source Data creation
  2. val df = Seq((1,"AAAA","val1","val2","val3","val4","valw","valx","valy","valz"),(2,"BBBB","del1","del2","del3","del4","delw","delx","dely","delz"),(3,"CCCC","sol1","sol2","sol3","sol4",null,null,null,null)).toDF("id","name","q1_w","q1_x","q1_y","q1_z","q2_w","q2_x","q2_y","q2_z")
  3. //creating first dataframe with required columns and renaming them
  4. val df1 = df.select("id","name","q1_w","q1_x","q1_y","q1_z").filter($"q1_w".isNotNull).filter($"q1_x".isNotNull).filter($"q1_y".isNotNull).filter($"q1_z".isNotNull).withColumnRenamed("q1_w","w").withColumnRenamed("q1_x","x").withColumnRenamed("q1_y","y").withColumnRenamed("q1_z","z")
  5. //creating second dataframe with required columns and renaming them
  6. val df2 = df.select("id","name","q2_w","q2_x","q2_y","q2_z").filter($"q2_w".isNotNull).filter($"q2_x".isNotNull).filter($"q2_y".isNotNull).filter($"q2_z".isNotNull).withColumnRenamed("q2_w","w").withColumnRenamed("q2_x","x").withColumnRenamed("q2_y","y").withColumnRenamed("q2_z","z")
  7. //union first and the second dataframe and you would get the output that is required.
  8. val finaldf = df1.union(df2)

您可以看到如下输出:

mdfafbf1

mdfafbf12#

你可以用 melt 从这里开始,然后在 _ 和pivot(注意pivot可能有点贵):

  1. id_vars = ['id','name']
  2. value_vars = [i for i in df.columns if i not in id_vars]
  3. value_name = "Val"
  4. var_name='Var'
  5. _vars_and_vals = F.array(*(
  6. F.struct(F.lit(c).alias(var_name), F.col(c).alias(value_name))
  7. for c in value_vars))
  8. # Add to the DataFrame and explode
  9. df1 = df.withColumn("_vars_and_vals", F.explode(_vars_and_vals))
  10. cols = ['id','name'] + [
  11. F.col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
  12. split_var = F.split("Var","_")
  13. out = (df1.select(*cols).withColumn("NewVar",split_var[1])
  14. .groupby(id_vars+[split_var[0].alias("q")]).pivot("NewVar").agg(F.first("Val")))
  1. out.show()
  2. +---+----+---+----+----+----+----+
  3. | id|name| q| w| x| y| z|
  4. +---+----+---+----+----+----+----+
  5. | 1|AAAA| q1|val1|val2|val3|val4|
  6. | 1|AAAA| q2|valw|valx|valy|valz|
  7. | 2|BBBB| q1|del1|del2|del3|del4|
  8. | 2|BBBB| q2|delw|delx|dely|delz|
  9. | 3|CCCC| q1|sol1|sol2|sol3|sol4|
  10. | 3|CCCC| q2|null|null|null|null|
  11. +---+----+---+----+----+----+----+
展开查看全部

相关问题