如何在两个不同的spark rdd中获取所有不同的记录

pjngdqdw  于 2021-05-29  发布在  Spark
关注(0)|答案(3)|浏览(417)

对于spark和rdd来说是非常新的,所以我希望我能很好地解释我想要的东西,以便有人能够理解和帮助:)
我有两个非常大的数据集,比如300万行50列,存储在hadoop hdfs中。我想做的是将这两个都读入rdd,以便它使用并行性&我想返回第三个rdd,其中包含所有不匹配的记录(来自任一rdd)。
下面希望有助于显示我要做什么。。。只是想以最快最有效的方式找到所有不同的记录。。。
数据的顺序不一定相同—rdd1的第1行可能是rdd2的第4行。
非常感谢!!

所以。。。这似乎是做什么,我想它,但它似乎远容易被正确。。。

%spark

import org.apache.spark.sql.DataFrame
import org.apache.spark.rdd.RDD
import sqlContext.implicits._
import org.apache.spark.sql._

//create the tab1 rdd.
val rdd1 = sqlContext.sql("select * FROM table1").withColumn("source",lit("tab1"))

//create the tab2 rdd.
val rdd2 = sqlContext.sql("select * FROM table2").withColumn("source",lit("tab2"))

//create the rdd of all misaligned records between table1 and the table2.
val rdd3 = rdd1.except(rdd2).unionAll(rdd2.except(rdd1))

//rdd3.printSchema()    
//val rdd3 = rdd1.except(rdd2)

//drop the temporary table that was used to create a hive compatible table from the last run.
sqlContext.dropTempTable("table3")

//register the new temporary table.
rdd3.toDF().registerTempTable("table3")

//drop the old compare table.
sqlContext.sql("drop table if exists data_base.compare_table")

//create the new version of the s_asset compare table.
sqlContext.sql("create table data_base.compare_table as select * from table3")

这是我到目前为止完成的最后一段代码,它似乎在做这项工作-不确定在整个数据集上的性能,我会祈祷。。。
非常感谢所有花时间帮助这个可怜的人:)
p、 如果有人有一个更高性能的解决方案,我很乐意听到!或者如果你能看到一些问题,这可能意味着它将返回错误的结果。

e4eetjau

e4eetjau1#

两者都可以用“full\ u outer”连接,然后应用过滤器,其中字段值在两者中进行比较:

val filterCondition = cols
  .map(c => (col(s"l.$c") =!= col(s"r.$c") || col(s"l.$c").isNull || col(s"r.$c").isNull))
  .reduce((acc, c) => acc || c)

df1.alias("l")
  .join(df2.alias("r"), $"l.rowid" === $"r.rowid", "full_outer")
  .where(filterCondition)

输出:

+--------+--------+--------+-----------+------+--------+--------+--------+-----------+------+
|rowid   |name    |status  |lastupdated|source|rowid   |name    |status  |lastupdated|source|
+--------+--------+--------+-----------+------+--------+--------+--------+-----------+------+
|1-za23f1|product2|inactive|31-12-2019 |rdd1  |1-za23f1|product2|active  |31-12-2019 |rdd2  |
|1-za23f2|product3|inactive|01-01-2020 |rdd1  |1-za23f2|product3|active  |01-01-2020 |rdd2  |
|1-za23f3|product4|inactive|02-01-2020 |rdd1  |1-za23f3|product1|inactive|02-01-2020 |rdd2  |
+--------+--------+--------+-----------+------+--------+--------+--------+-----------+------+
83qze16e

83qze16e2#

将两个Dataframe加载为 df1 , df2 添加 source 默认值为的列 rdd1 以及 rdd2 分别地
工会 df1 以及 df2 分组依据 "rowid", "name", "status", "lastupdated" 把它的来源收集起来
筛选具有单个源的所有行

import org.apache.spark.sql.functions._

object OuterJoin {

  def main(args: Array[String]): Unit = {

    val spark = Constant.getSparkSess

    import spark.implicits._

    val cols = Array("rowid", "name", "status", "lastupdated")

    val df1 = List(
      ("1-za23f0", "product1", "active", "30-12-2019"),
      ("1-za23f1", "product2", "inactive", "31-12-2019"),
      ("1-za23f2", "product3", "inactive", "01-01-2020"),
      ("1-za23f3", "product4", "inactive", "02-01-2020"),
      ("1-za23f4", "product5", "inactive", "03-01-2020"))
      .toDF(cols:_ *)
      .withColumn("source",lit("rdd1"))

    val df2 = List(
      ("1-za23f0", "product1", "active", "30-12-2019"),
      ("1-za23f1", "product2", "active", "31-12-2019"),
      ("1-za23f2", "product3", "active", "01-01-2020"),
      ("1-za23f3", "product1", "inactive", "02-01-2020"),
      ("1-za23f4", "product5", "inactive", "03-01-2020"))
      .toDF(cols:_ *)
        .withColumn("source",lit("rdd2"))

    df1.union(df2)
      .groupBy(cols.map(col):_ *)
      .agg(collect_set("source").as("sources"))
      .filter(size(col("sources")) === 1)
      .withColumn("from_rdd", explode(col("sources") ))
      .drop("sources")
      .show()
  }

}
1wnzp6jl

1wnzp6jl3#

您可以将数据读入dataframes而不是rdd,然后使用union和groupby来实现结果

相关问题