基于scala的高效分布式公共元素集合合并算法

6qqygrtg  于 2021-06-25  发布在  Flink
关注(0)|答案(3)|浏览(404)

我正在flink上开发minhash lsh的分布式实现,作为最后一步,我需要合并一些集群,这些集群被标识为它们之间相似的元素集。
所以我有一个分布式集合作为输入,我需要一个算法来有效地将集合与公共元素合并。考虑到flink的计算模型,该算法可能是迭代的,不一定像map-reduce那样。
举个例子:
{{1{1,2}},{2,{2,3}},{3,{4,5},{4{1,27}}}} 结果应该是 {1,2,3,27},{4,5} 因为集合#1、#2和#4至少有一个共同元素。

5hcedyr0

5hcedyr01#

只是一个想法,也许有更好的方法,但是这个怎么样:
在Map步骤中,为每个集合中的每个元素发出一个键值对,如下所示: element -> other elements 在reduce步骤中,收集其他元素并丢弃重复项
重复此操作,直到数据结构停止更改
在第一次迭代之后,您的数据将如下所示:

1 -> 2, 27
2 -> 1,3
3 -> 2
4 -> 5
5 -> 4
27 -> 1

第二次之后:

1 -> 2, 3, 27
2 -> 1, 3, 27
3 -> 1, 2
4 -> 5
5 -> 4
27 -> 1, 2

最后在第三次之后:

1 -> 2, 3, 27
2 -> 1, 3, 27
3 -> 1, 2, 27
4 -> 5
5 -> 4
27 -> 1, 2, 3

我目前没有办法弄清楚什么时候改变停止了。
为了只获得每个结果的一个副本,您可以删除所有键大于其他元素的地方。

vmpqdwk3

vmpqdwk32#

如果你有 N 周围设置 M 每一个元素,天真的方式(测试每一个元素的每一个集合)是 O(N^2 * M^2) 如果复制是罕见的。如果你真的只有 R << N*M 不过,不同的元素并没有那么糟糕:一旦发现了某些东西,就可以停止测试,而这些东西在不存在之后就会发生 N*M 比较,但仅限于 R ,所以你只能说“只有” O(N*N*R) . 但是你不必测试每一个集合,如果集合实际上只存在于 L 小组,因为一旦你击中正确的小组,你就会停下来。所以更像是 O(N*L*R) + O(N*M) (第二个术语实际上是在找到要添加到的正确组后将元素添加到组中)。
如果你把每个元素Map到它包含的集合列表,你可以在 O(N*M) 时间——然后您可以遍历每个元素的集合树,最多大约访问一次每个不同的元素(即。 R 每一个访问每一组提到它(这是关于 N*M/R )添加所有元素(但只添加一次!),总的来说 O(N*M) 如果你小心不要多次添加同一个集合(你需要一个 Package 器来 Package 这些集合,这样你就可以知道你是否已经访问过它们。)这样更快,但是如果 L*R 很小,你可能不在乎。
在scala中,从元素到树的Map的核心类似于

case class W(s: Set[Int]) { var visited: Boolean = false }
def tree(ss: Seq[Set[Int]]) = {
  var m = new collection.mutable.HashMap[Int, List[W]]
  ss.foreach{ s =>
    s.foreach{i =>
      m(i) = W(s) :: m.getOrElse(i, Nil)
    }
  }
}

遍历这些组更为复杂,但基本思想是保留所看到的元素的Map,如果碰到其中一个元素,则不继续遍历,并且在合并元素时通过在w Package 器中设置标志来跟踪是否遍历了一个集。

0aydgbwb

0aydgbwb3#

这里有一个想法:作为flink的一部分,gelly有一个互联组件查找器。制作一个图,其中每个集合元素有一个节点,边以最简单的方式连接每个集合的元素,例如{a,b,c,d,…}加上[a,b],[a,c],[a,d],[a。现在找到连接的组件。它们的节点给出了您要查找的集合。
如果您担心从集合到图形再到图形的转换对性能的影响(尽管这种担心是过早的优化;你应该尝试一下),这将是足够简单的重新实现gelly的代币推送方案集。这是如何工作的。您的示例中已经有了标记:集合编号。让我们来看看你的例子。e、 g.s[1]={1,2}。设r是一个逆多重Map,它把每个集合元素带到它所属的集合集合中。e、 在你的例子中,g.r[2]={1,2}。设t[i]是通过可传递的非空交集“links”从集合i可到达的元素。然后计算:

T[i] = S[i] for all i // with no links at all, a set reaches its own elements
loop
  for all i, Tnew[i] = \union_{ x \in T[i] } S[R[x]]  // add new reachables
  exit if Tnew == T
  T = Tnew
end loop

完成后,map t的不同值就是您想要的答案。最大迭代次数应该是log | u |,其中u是集合元素的宇宙。

相关问题