reactiveredisoperations.opsforstream()是否支持xclaim/claim

cbeh67ev  于 2021-06-09  发布在  Redis
关注(0)|答案(1)|浏览(465)

为了使用redis流构建可靠的消息队列,我使用springbootstarter数据redis reactive和莴苣依赖来处理来自redis流的消息。虽然我可以通过中提供的api添加、读取、确认和删除消息 ReactiveRedisOperations.opsForStream() 以用户组的形式,我找不到一个api来声明一个挂起的消息,尽管它在下面可用,但它在5分钟内没有得到确认 this.reactiveRedisConnectionFactory .getReactiveConnection() .streamCommands() .xClaim() . 但是我不想有一个样板代码来管理异常、序列化等。有没有一种方法可以使用 ReactiveRedisOperations.opsForStream() https://docs.spring.io/spring-data/redis/docs/current/api/org/springframework/data/redis/core/reactivestreamoperations.html

0h4hbjxa

0h4hbjxa1#

如果没有springdataredis,直接使用莴苣客户机库,我就可以获得挂起的消息,并声明如下所示的消息

public Flux<PendingMessage> getPendingMessages(PollMessage pollMessage, String queueName) {
    Predicate<PendingMessage> poisonMessage = pendingMessage -> (pendingMessage.getTotalDeliveryCount()<=maxRetries);
    Predicate<PendingMessage> nackMessage = pendingMessage -> (pendingMessage.getElapsedTimeSinceLastDelivery().compareTo(Duration.ofMillis(ackTimeout)) > 0 );

    return statefulRedisClusterConnection.reactive()
        .xpending(queueName, pollMessage.getConsumerGroupName(), Range.unbounded(), Limit.from(1000))
        .collectList()
        .map((it) -> ((PendingMessages)PENDING_MESSAGES_CONVERTER
                .apply(it, pollMessage.getConsumerGroupName()))
                .withinRange(org.springframework.data.domain.Range.unbounded()))
            .flatMapMany(Flux::fromIterable)
            .filter(nackMessage)
            .filter(poisonMessage)
            .limitRequest(pollMessage.getBatchSize());
}

为了声明消息,我再次使用了莴苣库中提供的api

public Flux<StreamMessage<String, String>> claimMessage(PendingMessage pendingMessage, String queueName, String groupName, String serviceName) {
    return statefulRedisClusterConnection.reactive()
            .xclaim(queueName, Consumer.from(groupName, serviceName), 0, pendingMessage.getIdAsString());
}

目前,通过spring数据从redis获取挂起的消息有问题,因此我直接使用莴苣库来获取挂起的消息并声明它。
https://jira.spring.io/browse/dataredis-1160

相关问题