我有一个Kafka的主题。所以我必须处理大的无限热源。为了简单起见,我将用整数通量来吸收它。
我想把这个通量变成一个无序的值的通量,它只包含通量不唯一的值。
例如1, 2, 3, 2, 1, 1, 4 -> 2, 2, 1, 1, 1
下面是一个有效的尝试:
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 1, 1, 1, 2, 4);
flux.groupBy(Function.identity())
.flatMap(
it -> {
Flux<Integer> shared = it.cache();
return Flux.concat(shared.buffer(2)
.take(1)
.map(buff -> buff.stream().findFirst().get()), shared.skip(1));
}
)
.doOnNext(System.out::println)
.blockLast();
但是我不喜欢flatMap中使用Flux.cache,因为我担心内存的使用。
我也不确定它将如何与一个热源一起工作,就好像我使用publish()变成一个热通量,什么也没有发生。
我之前已经解释过了。
1条答案
按热度按时间fdbelqdn1#
根据我对要求的理解,你想
要过滤重复项,决定什么是重复项至关重要(快速且安全!)。其余的可以以如下的React方式处理:
对于
Flux.just(1, 2, 3, 4, 1, 1, 1, 2, 4);
,我们得到:模拟(重新)产生(使用
KEY_SPACE
和0xCAFEBABE
)