spark:将数据集的两列合并为一列

zaq34kh6  于 2021-05-27  发布在  Spark
关注(0)|答案(3)|浏览(681)

我有一个ID为2个不同列的表。我有另一个表,其中包含与id关联的对象。我想从表2中筛选出id存在于表1的id1或id2中的id。
表1:

| id1  | id2 |
|  1   |  1  |
|  1   |  1  |
|  1   |  3  |
|  2   |  5  |
|  3   |  1  | 
|  3   |  2  |
|  3   |  3  |

表2:

| id  | obj   |
|  1  |  'A'  |
|  2  |  'B'  |
|  3  |  'C'  |
|  4  |  'D'  | 
|  5  |  'E'  |  
|  6  |  'F'  |
|  7  |  'G'  |

我想的是从表1中创建一个包含唯一ID的列表,该ID将是上面示例中的[1,2,3,5]。
然后根据列表过滤出Dataframe,得到结果。

| id  | obj   |
|  1  |  'A'  |
|  2  |  'B'  |
|  3  |  'C'  |
|  5  |  'E'  |

尽管我对解决方案的可扩展性有顾虑。列表可能很大,在某些情况下甚至可能无法加载到内存中。在这种情况下有什么建议吗?
谢谢。

hfyxw5xn

hfyxw5xn1#

在spark中使用sparksql-note-joins有一整套性能考虑,包括df大小、密钥分配等,所以请您熟悉一下。
一般来说:

table2.as("t2")
  .join(
    table1.as("t1"),
    $"t2.id" === $"t1.id1" || $"t2.id" === $"t1.id2",
    "left"
  )
  .where($"t1.id1".isNull)
  .select("t2.*")
s6fujrry

s6fujrry2#

以下方法可行

import spark.implicits._
      val t1 = Seq((1,1),(1,1),(1,3),(2,5),(3,1),(3,2),(3,3))
      val t2 = Seq((1,"A"),(2,"B"),(3,"C"),(4,"D"),(5,"E"),(6,"F"),(7,"G"))
      val tt1 = sc.parallelize(t1).toDF("id1","id2")
                  .persist(StorageLevel.MEMORY_AND_DISK)
      val tt2 = sc.parallelize(t2).toDF("id", "obj")
                  .persist(StorageLevel.MEMORY_AND_DISK)

      tt1.show()
      tt2.show()

      tt1.createOrReplaceTempView("table1")
      tt2.createOrReplaceTempView("table2")

     val output = sqlContext.sql(
        """
          |SELECT DISTINCT id, obj
          |FROM table1 t1
          |JOIN table2 t2 ON(t1.id1 = t2.id) OR (t1.id2 = id)
          |ORDER BY id
          |""".stripMargin).persist(StorageLevel.MEMORY_AND_DISK)

      output.show()

输出

+---+---+
| id|obj|
+---+---+
|  1|  A|
|  2|  B|
|  3|  C|
|  5|  E|
+---+---+

对于内存问题,您可以将数据持久化到内存和磁盘,但是有更多的选项,您可以选择适合您的特定问题的最佳选项,您可以遵循以下链接:rdd persistence
我也会通过配置来考虑分区的数量:

spark.sql.shuffle.partitions
/*
Configures the number of partitions to use when shuffling data for joins or aggregations.

* /

  val spark = SparkSession
    .builder()
    .appName("MySparkProcess")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions","400") //Change to a more reasonable default number of partitions for our data
    .config("spark.app.id","MySparkProcess") // To silence Metrics warning
    .getOrCreate()

我也会查看此链接以了解进一步的配置:
性能调整
我希望这有帮助。

ui7jx7zq

ui7jx7zq3#

另一种方法:

val id_table = table1.select(explode(array('*)).as("id")).distinct()
val result = table2.join(id_table,"id")
result.show()

输出:

+---+---+
| id|obj|
+---+---+
|  1|'A'|
|  2|'B'|
|  3|'C'|
|  5|'E'|
+---+---+

相关问题