如何使用spark比较两个表的列?

wfveoks0  于 2021-05-27  发布在  Hadoop
关注(0)|答案(1)|浏览(819)

我试图通过读取Dataframe来比较两个表()。对于这些表中的每个公共列,使用主键(如order\u id)与其他列(如order\u date、order\u name、order\u event)串联。
我使用的scala代码

val primary_key=order_id
for (i <- commonColumnsList){
      val column_name = i
      val tempDataFrameForNew = newDataFrame.selectExpr(s"concat($primaryKey,$i) as concatenated")
      val tempDataFrameOld = oldDataFrame.selectExpr(s"concat($primaryKey,$i) as concatenated")

      //Get those records which aren common in both old/new tables
      matchCountCalculated = tempDataFrameForNew.intersect(tempDataFrameOld)
      //Get those records which aren't common in both old/new tables
      nonMatchCountCalculated = tempDataFrameOld.unionAll(tempDataFrameForNew).except(matchCountCalculated)

      //Total Null/Non-Null Counts in both old and new tables.
      nullsCountInNewDataFrame = newDataFrame.select(s"$i").filter(x => x.isNullAt(0)).count().toInt
      nullsCountInOldDataFrame = oldDataFrame.select(s"$i").filter(x => x.isNullAt(0)).count().toInt
      nonNullsCountInNewDataFrame = newDFCount - nullsCountInNewDataFrame
      nonNullsCountInOldDataFrame = oldDFCount - nullsCountInOldDataFrame

      //Put the result for a given column in a Seq variable, later convert it to Dataframe.
      tempSeq = tempSeq :+ Row(column_name, matchCountCalculated.toString, nonMatchCountCalculated.toString, (nullsCountInNewDataFrame - nullsCountInOldDataFrame).toString,
       (nonNullsCountInNewDataFrame - nonNullsCountInOldDataFrame).toString)
}
     // Final Step: Create DataFrame using Seq and some Schema.
     spark.createDataFrame(spark.sparkContext.parallelize(tempSeq), schema)

上面的代码对于中等数据集来说运行良好,但是随着新旧表中列和记录数量的增加,执行时间也在增加。任何建议都很感激。先谢谢你。

j5fpnvbx

j5fpnvbx1#

您可以执行以下操作:
1外部连接priamary键上的新旧Dataframe joined_df = df_old.join(df_new, primary_key, "outer") 2如果可能的话,把它藏起来。这会节省你很多时间
三。现在可以使用spark函数对列进行迭代和比较( .isNull 对于不匹配, == 用于匹配等)

for (col <- df_new.columns){
  val matchCount = df_joined.filter(df_new[col].isNotNull && df_old[col].isNotNull).count()
  val nonMatchCount = ...
}

这应该要快得多,尤其是当您可以缓存Dataframe时。如果不能,那么最好将加入的df保存到磁盘,以避免每次洗牌

相关问题