apache spark-多个RDD的交集

mw3dktmi  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(571)

在apachespark中,可以使用 sparkContext.union() 方法。如果有人想让多个RDD相交,有没有类似的东西?我已经在sparkcontext方法中搜索过了,但找不到任何东西或其他地方。一种解决方案可以是联合rdd,然后检索重复的rdd,但我认为这可能没有那么有效。假设我有以下关于键/值对集合的示例:

  1. val rdd1 = sc.parallelize(Seq((1,1.0),(2,1.0)))
  2. val rdd2 = sc.parallelize(Seq((1,2.0),(3,4.0),(3,1.0)))

我要检索包含以下元素的新集合:

  1. (1,2.0) (1,1.0)

当然,对于多个RDD,而不仅仅是两个。

cygmwpex

cygmwpex1#

尝试:

  1. val rdds = Seq(
  2. sc.parallelize(Seq(1, 3, 5)),
  3. sc.parallelize(Seq(3, 5)),
  4. sc.parallelize(Seq(1, 3))
  5. )
  6. rdds.map(rdd => rdd.map(x => (x, None))).reduce((x, y) => x.join(y).keys.map(x => (x, None))).keys
a7qyws3x

a7qyws3x2#

rdd上有一个交集方法,但它只需要另一个rdd:

  1. def intersection(other: RDD[T]): RDD[T]

让我们用这个来实现你想要的方法。

  1. def intersectRDDs[T](rdds: Seq[RDD[T]]): RDD[T] = {
  2. rdds.reduce { case (left, right) => left.intersection(right)
  3. }

如果您已经了解了spark连接的实现,可以通过将最大的rdd放在第一位来优化执行:

  1. def intersectRDDs[T](rdds: Seq[RDD[T]]): RDD[T] = {
  2. rdds.sortBy(rdd => -1 * rdd.partitions.length)
  3. .reduce { case (left, right) => left.intersection(right)
  4. }

编辑:看起来我误读了您的示例:您的文本看起来像是在搜索rdd.union的反向行为,但您的示例暗示您希望按键相交。我的回答不涉及这个问题。

展开查看全部

相关问题