基于spark核的sparkDataframe的标度列比较

rmbxnbpk  于 2021-07-13  发布在  Spark
关注(0)|答案(3)|浏览(206)

举个例子,但是 N 按列比较两个Dataframe之间的列数。
给定5行3列的示例 EmpID 作为主键。
如何在spark core中进行比较?
输入F1:

|EMPID |Dept     |  Salary
--------------------------
|1     |HR       |   100
|2     |IT       |   200
|3     |Finance  |   250
|4     |Accounts |   200
|5     |IT       |   150

输入数据2:

|EMPID |Dept       |Salary
------------------------------
|1     |HR         | 100
|2     |IT         | 200
|3     |FIN        | 250
|4     |Accounts   | 150
|5     |IT         | 150

预期结果df:

|EMPID   |Dept      |Dept      |status      |Salary     |Salary   |status
--------------------------------------------------------------------
|1       |HR        |HR        | TRUE       | 100       | 100     | TRUE
|2       |IT        |IT        | TRUE       | 200       | 200     | TRUE
|3       |Finance   |FIN       | False      | 250       | 250     | TRUE
|4       |Accounts  |Accounts  | TRUE       | 200       | 150     | FALSE
|5       |IT        |IT        | TRUE       | 150       | 150     | TRUE
lhcgjxsq

lhcgjxsq1#

可以使用join,然后在 df.columns 要选择所需的输出列:

val df_final = df1.alias("df1")
  .join(df2.alias("df2"), "EMPID")
  .select(
      Seq(col("EMPID")) ++
      df1.columns.filter(_ != "EMPID")
        .flatMap(c =>
          Seq(
            col(s"df1.$c").as(s"df1_$c"),
            col(s"df2.$c").as(s"df2_$c"),
            (col(s"df1.$c") === col(s"df2.$c")).as(s"status_$c")
          )
      ): _*
)

df_final.show

//+-----+--------+--------+-----------+----------+----------+-------------+
//|EMPID|df1_Dept|df2_Dept|status_Dept|df1_Salary|df2_Salary|status_Salary|
//+-----+--------+--------+-----------+----------+----------+-------------+
//|    1|      HR|      HR|       true|       100|       100|         true|
//|    2|      IT|      IT|       true|       200|       200|         true|
//|    3| Finance|     FIN|      false|       250|       250|         true|
//|    4|Accounts|Accounts|       true|       200|       150|        false|
//|    5|      IT|      IT|       true|       150|       150|         true|
//+-----+--------+--------+-----------+----------+----------+-------------+
fnvucqvd

fnvucqvd2#

您可以使用empid执行联接并比较结果列:

val result = df1.alias("df1").join(
    df2.alias("df2"), "EMPID"
).select(
    $"EMPID",
    $"df1.Dept", $"df2.Dept",
    ($"df1.Dept" === $"df2.Dept").as("status"),
    $"df1.Salary", $"df2.Salary",
    ($"df1.Salary" === $"df2.Salary").as("status")
)

result.show
+-----+--------+--------+------+------+------+------+
|EMPID|    Dept|    Dept|status|Salary|Salary|status|
+-----+--------+--------+------+------+------+------+
|    1|      HR|      HR|  true|   100|   100|  true|
|    2|      IT|      IT|  true|   200|   200|  true|
|    3| Finance|     FIN| false|   250|   250|  true|
|    4|Accounts|Accounts|  true|   200|   150| false|
|    5|      IT|      IT|  true|   150|   150|  true|
+-----+--------+--------+------+------+------+------+

请注意,您可能希望重命名这些列,因为以后无法查询重复的列名。

vlju58qv

vlju58qv3#

您也可以通过以下方式进行:

//Source data
val df = Seq((1,"HR",100),(2,"IT",200),(3,"Finance",250),(4,"Accounts",200),(5,"IT",150)).toDF("EMPID","Dept","Salary")
val df1 = Seq((1,"HR",100),(2,"IT",200),(3,"Fin",250),(4,"Accounts",150),(5,"IT",150)).toDF("EMPID","Dept","Salary")

//joins and other operations
val finalDF = df.as("d").join(df1.as("d1"),Seq("EMPID"),"inner")
.withColumn("DeptStatus",$"d.Dept" === $"d1.Dept")
.withColumn("Salarystatus",$"d.Salary" === $"d1.Salary")
.selectExpr("EMPID","d.Dept","d1.Dept","DeptStatus as 
Status","d.Salary","d1.Salary","SalaryStatus as Status")
display(finalDF)

您可以看到如下输出:

相关问题