spring引导redis streamreceiver等待事件完成

6fe3ivhb  于 2021-06-10  发布在  Redis
关注(0)|答案(0)|浏览(373)

我有一个应用程序,它使用streamreceiver从redis stream消费事件,还有一个控制器,用于停止和启动一个特定的系统,在这个系统中应用程序更新db并进行一些处理。
我的要求是:当系统停止和启动事件发生时,不应该处理redis流事件来保持数据库的一致性。一旦系统启动/停止完成,就必须继续处理事件。我能知道处理这件事最好的方法是什么吗。
我的streamreceiver代码:

Flux<MapRecord<String, String, String>> stream = streamReceiver.receiveAutoAck(consumer,
                StreamOffset.create("events", ReadOffset.lastConsumed()));
        stream.log().doOnNext(data -> {
            try {
    Service.processEventsNotificationMessage(data);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).onErrorContinue((throwable, event) -> {
            log.error("Error while processing the event {}. Cause: {}", event, throwable.getMessage());
            ;
        }).doOnTerminate(() -> {
            log.info("Redis events stream terminated");
        }).subscribe();```

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题