React堆通量的java条件逻辑

x0fgdtte  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(326)

我是个React堆新手。我正在尝试开发以下应用程序逻辑:
阅读Kafka主题的信息 source .
变换按摩。
将转换后的消息的子集写入新的Kafka主题 target .
显式确认最初从主题读取的所有消息的读取操作 source .
我找到的唯一解决方案是重写上面的业务逻辑,如下所示。
阅读Kafka主题的信息 source .
变换按摩。
立即确认消息未写入主题 target .
过滤以上所有消息。
把剩下的信息写进新的Kafka主题 target .
明确确认这些消息的读取操作
实现第二个逻辑的代码如下:

receiver.receive()
        .flatMap(this::processMessage)
        .map(this::acknowledgeMessagesNotToWriteInKafka)
        .filter(this::isMessageToWriteInKafka)
        .as(this::sendToKafka)
        .doOnNext(r -> r.correlationMetadata().acknowledge());

显然, receiver 类型为 KafkaReceiver ,和方法 sendToKafka 使用 KafkaSender . 我不喜欢的一件事是我在使用 map 确认一些信息。
有没有更好的解决方案来实现原来的逻辑?

z9smfwbn

z9smfwbn1#

这不完全是您的四个业务逻辑步骤,但我认为它更接近您想要的。
您可以确认不会写入的“丢弃”消息 .doOnDiscard 之后 .filter ...

receiver.receive()
        .flatMap(this::processMessage)
        .filter(this::isMessageToWriteInKafka)
        .doOnDiscard(ReceiverRecord.class, record -> record.receiverOffset().acknowledge())
        .as(this::sendToKafka)
        .doOnNext(r -> r.correlationMetadata().acknowledge());

注意:您需要使用被丢弃的正确对象类型。我不知道发布者返回的对象是什么类型的 processMessage 发射,但我想你可以得到 ReceiverRecord 或者 ReceiverOffset 为了承认这一点。
或者,您可以组合 filter / doOnDiscard 变成一个单一的 .handle 操作员。。。

receiver.receive()
        .flatMap(this::processMessage)
        .handle((m, sink) -> {
            if (isMessageToWriteInKafka(m)) {
                sink.next(m);
            } else {
                m.getReceiverRecord().getReceiverOffset().acknowledge();
            }
        })
        .as(this::sendToKafka)
        .doOnNext(r -> r.correlationMetadata().acknowledge());

相关问题