我是个React堆新手。我正在尝试开发以下应用程序逻辑:
阅读Kafka主题的信息 source
.
变换按摩。
将转换后的消息的子集写入新的Kafka主题 target
.
显式确认最初从主题读取的所有消息的读取操作 source
.
我找到的唯一解决方案是重写上面的业务逻辑,如下所示。
阅读Kafka主题的信息 source
.
变换按摩。
立即确认消息未写入主题 target
.
过滤以上所有消息。
把剩下的信息写进新的Kafka主题 target
.
明确确认这些消息的读取操作
实现第二个逻辑的代码如下:
receiver.receive()
.flatMap(this::processMessage)
.map(this::acknowledgeMessagesNotToWriteInKafka)
.filter(this::isMessageToWriteInKafka)
.as(this::sendToKafka)
.doOnNext(r -> r.correlationMetadata().acknowledge());
显然, receiver
类型为 KafkaReceiver
,和方法 sendToKafka
使用 KafkaSender
. 我不喜欢的一件事是我在使用 map
确认一些信息。
有没有更好的解决方案来实现原来的逻辑?
1条答案
按热度按时间z9smfwbn1#
这不完全是您的四个业务逻辑步骤,但我认为它更接近您想要的。
您可以确认不会写入的“丢弃”消息
.doOnDiscard
之后.filter
...注意:您需要使用被丢弃的正确对象类型。我不知道发布者返回的对象是什么类型的
processMessage
发射,但我想你可以得到ReceiverRecord
或者ReceiverOffset
为了承认这一点。或者,您可以组合
filter
/doOnDiscard
变成一个单一的.handle
操作员。。。