joinDataframe

oiopk7p5  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(219)

我必须在scala中基于自定义函数连接两个sparkDataframe。两个Dataframe具有相同的模式。
df1中数据的样本行:

{
  "F1" : "A",
  "F2" : "B",
  "F3" : "C",
  "F4" : [
               {
                "name" : "N1",
                "unit" : "none",
                "count" : 50.0,
                "sf1" : "val_1",
                "sf2" : "val_2"
              },
              {
                "name" : "N2",
                "unit" : "none",
                "count" : 100.0,
                "sf1" : "val_3",
                "sf2" : "val_4"
              }
         ]
}

df2中数据的样本行:

{
  "F1" : "A",
  "F2" : "B",
  "F3" : "C",
  "F4" : [
               {
                "name" : "N1",
                "unit" : "none",
                "count" : 80.0,
                "sf1" : "val_5",
                "sf2" : "val_6"
              },
              {
                "name" : "N2",
                "unit" : "none",
                "count" : 90.0,
                "sf1" : "val_7",
                "sf2" : "val_8"
              },
              {
                "name" : "N3",
                "unit" : "none",
                "count" : 99.0,
                "sf1" : "val_9",
                "sf2" : "val_10"
              }
         ]
}

合并这些样本行的结果:

{
  "F1" : "A",
  "F2" : "B",
  "F3" : "C",
  "F4" : [
               {
                "name" : "N1",
                "unit" : "none",
                "count" : 80.0,
                "sf1" : "val_5",
                "sf2" : "val_6"
              },
              {
                "name" : "N2",
                "unit" : "none",
                "count" : 100.0,
                "sf1" : "val_3",
                "sf2" : "val_4"
              },
              {
                "name" : "N3",
                "unit" : "none",
                "count" : 99.0,
                "sf1" : "val_9",
                "sf2" : "val_10"
              }
         ]
}

结果是:
基于“f1”、“f2”和“f3”值的完全外部联接+
“f4”的连接保持唯一节点(使用名称作为id),最大值为“count”
我对scala不是很熟悉,已经为此奋斗了一天多了。以下是我到目前为止得到的:

val df1 = sqlContext.read.parquet("stack_a.parquet")
val df2 = sqlContext.read.parquet("stack_b.parquet")

val df4 = df1.toDF(df1.columns.map(_ + "_A"):_*)
val df5 = df2.toDF(df1.columns.map(_ + "_B"):_*)
val df6 = df4.join(df5, df4("F1_A") === df5("F1_B") && df4("F2_A") === df5("F2_B") && df4("F3_A") === df5("F3_B"), "outer")

def joinFunction(r:Row) = {
   //Need the real-deal here!
   //print(r(3)) //-->Any = WrappedArray([..])

   //also considering parsing as json to do the processing but not sure about the performance impact
   //val parsed = JSON.parseFull(r.json) //then play with parsed

   r.toSeq //
}
val finalResult = df6.rdd.map(joinFunction)
finalResult.collect

我计划在中添加自定义合并逻辑 joinFunction 但是我正在努力将wrappedarray/任何类转换成我可以使用的东西。任何关于如何以更好的方式进行转换或连接的输入都将非常有用。
谢谢!

编辑(2021年3月7日)

完全的外部连接实际上只能在“f1”上执行。因此,使用@werner的答案,我在做:

val df1_a = df1.toDF(df1.columns.map(_ + "_A"):_*)
val df2_b = df2.toDF(df2.columns.map(_ + "_B"):_*)

val finalResult = df1_a.join(df2_b, df1_a("F1_A") === df2_b("F1_B"), "full_outer")
.drop("F1_B")
.withColumn("F4", joinFunction(col("F4_A"), col("F4_B")))
.drop("F4_A", "F4_B")
.withColumn("F2", when(col("F2_A").isNull, col("F2_B")).otherwise(col("F2_A")))
.drop("F2_A", "F2_B")
.withColumn("F3", when(col("F3_A").isNull, col("F3_B")).otherwise(col("F3_A")))
.drop("F3_A", "F3_B")

但我得到了这个错误。我错过了什么。。?

vyswwuz2

vyswwuz21#

您可以借助自定义项实现合并逻辑:

//case class to define the schema of the udf's return value
case class F4(name: String, unit: String, count: Double, sf1: String, sf2: String)

val joinFunction = udf((a: Seq[Row], b: Seq[Row]) =>
  (a ++ b).map(r => F4(r.getAs[String]("name"),
    r.getAs[String]("unit"),
    r.getAs[Double]("count"),
    r.getAs[String]("sf1"),
    r.getAs[String]("sf2")))
    //group the elements from both arrays by name
    .groupBy(_.name)
    //take the element with the max count from each group
    .map { case (_, d) => d.maxBy(_.count) } 
    .toSeq)

//join the two dataframes
val finalResult = df1.withColumnRenamed("F4", "F4_A").join(
  df2.withColumnRenamed("F4", "F4_B"), Seq("F1", "F2", "F3"), "full_outer")
//call the merge function
.withColumn("F4", joinFunction('F4_A, 'F4_B))
//drop the the intermediate columns
.drop("F4_A", "F4_B")

相关问题