kotlin 使用Flux::groupBy的React堆网状物导致永久冻结

cld4siwp  于 2023-04-21  发布在  Kotlin
关注(0)|答案(1)|浏览(124)

我有两个对象类型不同但属性ID相同的列表

list 1 => [{ id: "123, x: "xxx" }] | list 2 => [{ id: "123", y: "yyy" }, {id: "456", y: "yyy"}]

所以我想用groupby方法把这两个列表合并成一个新列表。2这就是我所尝试的。

// both list are not sorted and every id in the list_1 exists in list_2
  // list_1 data class Object1(val id: String, val x: String)
  val flux1 = Flux.fromIterable(list_1)
                  .map { obj -> obj.id to obj }
  // list_2 data class Object2(val id: String, val y: String)
  val flux2 = Flux.fromIteablle(list_2)
                  .map { obj -> obj.id to obj }

  Flux.merge(flux1, flux2)
      .groupBy { (id, obj) -> id }
      .flatMap { gFlux ->
        gFlux
          .map { (id, obj) -> obj }
          .collectList()
          .filter { it.size == 2 }
          .map { (obj1, obj2) -> Object3(obj1, obj2) }
      }
      .collectList()

但是由于列表2的大小很大,它开始永久冻结,我不知道为什么。在像这样分组之前,我通过过滤通量暂时修复了它,因为我必须根据list_1对这些通量进行分组。

Flux.merge(flux1, flux2)
    .filter { (id, obj) -> id in list_1.map { it.id } }

所以我想知道为什么它冻结和什么是正确的解决方案来解决这个问题,或者有任何更好的解决方案分组两个名单的基础上,第一个?

woobm2wo

woobm2wo1#

groupBy不适用于大型组(javadoc):
值得注意的是,当标准产生大量的组时,如果组没有被适当地在下游使用(例如,由于具有设置得太低的maxConcurrency参数的flatMap),则可能导致挂起。
因此,与其使用groupBy,你应该使用不同的方法聚合结果。在你的情况下,它可能是这样的:

Flux.merge(flux1, flux2)
  .reduce(mutableMapOf(), { (map, (id, obj)) ->
      if (!map.containsKey(id) {
         map[id] = mutableListOf()
      }
      map[id].add(obj)
      map
  }
  .flatMap { Flux.fromIterable(it.entries()) }
  .filter { it.value.size == 2 }
  .map { it.value }
  .map { (obj1, obj2) -> Object3(obj1, obj2) }     
  .collectList()

但它不是很有效。因为你把所有的数据都保存在内存中,直到最后一步,这和groupBy的问题是一样的。但是在这种情况下,你不会消耗所有的调度器,所以它工作得更好。如果你在减少时可以删除一些组,比如多于2个元素的组,那会更优化。

相关问题