java 缓冲流中的唯一值,直到它不再是唯一的-使用groupby内部的缓存

htzpubme  于 2023-05-05  发布在  Java
关注(0)|答案(1)|浏览(145)

我有一个Kafka的主题。所以我必须处理大的无限热源。为了简单起见,我将用整数通量来吸收它。
我想把这个通量变成一个无序的值的通量,它只包含通量不唯一的值。
例如1, 2, 3, 2, 1, 1, 4 -> 2, 2, 1, 1, 1
下面是一个有效的尝试:

Flux<Integer> flux = Flux.just(1, 2, 3, 4, 1, 1, 1, 2, 4);

    flux.groupBy(Function.identity())
        .flatMap(
                it -> {
                    Flux<Integer> shared = it.cache();
                    return Flux.concat(shared.buffer(2)
                            .take(1)
                            .map(buff -> buff.stream().findFirst().get()), shared.skip(1));
                }
        )
        .doOnNext(System.out::println)
        .blockLast();

但是我不喜欢flatMap中使用Flux.cache,因为我担心内存的使用。
我也不确定它将如何与一个热源一起工作,就好像我使用publish()变成一个热通量,什么也没有发生。
我之前已经解释过了。

fdbelqdn

fdbelqdn1#

根据我对要求的理解,你想

  • 过滤重复项
  • 从实时流。

要过滤重复项,决定什么是重复项至关重要(快速且安全!)。其余的可以以如下的React方式处理:

package com.example.demo;

import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import org.reactivestreams.Publisher;

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;

@Slf4j
public class ReactiveDemo {

    // here we "simulate" very fast "indexer"
    static final Set<Integer> uniques = new HashSet<>();
    // to recognize past dups, we need:
    static final Set<Integer> dupes = new HashSet<>();
    // for "memory" these datastructures are efficient, in "real world"/on large scale/distributed, we might have other ..

    public static void main(String[] args) {
        Flux<Integer> tstInbound = Flux.just(1, 2, 3, 4, 1, 1, 1, 2, 4);
        // for endless simulation, replace with:
        // Flux.from(source);// kill with Ctrl+C (SIG), sry

        // subscribe/publish/do something, DONT block!
        tstInbound.subscribe((nxt) -> {
            // here is the main job/business logic
            // do it as good, fast, ... as possible
            if (uniques.contains(nxt)) {// we have a (1st time) duplication

                uniques.remove(nxt);
                dupes.add(nxt);

                // we did our job (identifying, handling, memorizing a dup), bye:
                downStream.accept(nxt); // down stream can be slow, we "fire (& forget)"
            } else if (dupes.contains(nxt)) {// we have a (repeated) duplicate
                // bye:
                downStream.accept(nxt); // ... we don't halt here
            } else { // first time encounter
                // don't send to donwstream, but safe "uniqe"
                uniques.add(nxt);
            }
        });
        // nxt level: caching/buffering/throtelling/draining :)
    }

    // smaller KEY_SPACE -> more and faster duplicates
    static final Integer KEY_SPACE = Integer.MAX_VALUE / 32 / 8182;
    // (only) test data generator
    static final Random RND = new Random(0xCAFEBABE); // same seed-> same data!
    // this our (test) publisher:
    static Publisher<? extends Integer> source = (s) -> {
        while (true) { // kill with CTRL+C, sowwy! :)
            try {
                TimeUnit.MILLISECONDS.sleep(10); // <- little simulate
                s.onNext(RND.nextInt(KEY_SPACE)); // <- publisher api (1.)
            } catch (InterruptedException e) {
                log.warn("Interrupted upstream");
                s.onError(e); // <- publisher api (2.)
            }
            s.onComplete(); // <- publisher api (3.)
        }
    };

    // our down stream, just logs:
    static Consumer<Integer> downStream = (i) -> {
        try {
            TimeUnit.SECONDS.sleep(1); // <- more simulate
            log.info("{}", i);
        } catch (InterruptedException e) {
            log.warn("Interrupted downstream");
        }
    };
}

对于Flux.just(1, 2, 3, 4, 1, 1, 1, 2, 4);,我们得到:

xx:x1:51.028 [main] INFO com.example.demo.ReactiveDemo -- 1
xx:x1:52.042 [main] INFO com.example.demo.ReactiveDemo -- 1
xx:x1:53.055 [main] INFO com.example.demo.ReactiveDemo -- 1
xx:x1:54.062 [main] INFO com.example.demo.ReactiveDemo -- 2
xx:x1:55.073 [main] INFO com.example.demo.ReactiveDemo -- 4

模拟(重新)产生(使用KEY_SPACE0xCAFEBABE

xx:x0:11.756 [main] INFO com.example.demo.ReactiveDemo -- 6751
xx:x0:12.941 [main] INFO com.example.demo.ReactiveDemo -- 3948
xx:x0:13.998 [main] INFO com.example.demo.ReactiveDemo -- 838
xx:x0:15.216 [main] INFO com.example.demo.ReactiveDemo -- 3294
xx:x0:16.407 [main] INFO com.example.demo.ReactiveDemo -- 8139
# .. runs "infinitely" :)

相关问题