我有一个应用程序,它使用streamreceiver从redis stream消费事件,还有一个控制器,用于停止和启动一个特定的系统,在这个系统中应用程序更新db并进行一些处理。
我的要求是:当系统停止和启动事件发生时,不应该处理redis流事件来保持数据库的一致性。一旦系统启动/停止完成,就必须继续处理事件。我能知道处理这件事最好的方法是什么吗。
我的streamreceiver代码:
Flux<MapRecord<String, String, String>> stream = streamReceiver.receiveAutoAck(consumer,
StreamOffset.create("events", ReadOffset.lastConsumed()));
stream.log().doOnNext(data -> {
try {
Service.processEventsNotificationMessage(data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).onErrorContinue((throwable, event) -> {
log.error("Error while processing the event {}. Cause: {}", event, throwable.getMessage());
;
}).doOnTerminate(() -> {
log.info("Redis events stream terminated");
}).subscribe();```
暂无答案!
目前还没有任何答案,快来回答吧!