spring数据redis streams,不知道我未确认的消息发生了什么?

0qx6xfy6  于 2021-06-09  发布在  Redis
关注(0)|答案(1)|浏览(866)

我使用下面的代码使用spring数据redis使用者组来使用redis流,但是即使我已经注解掉了acknowledge命令,在服务器重新启动之后,我的消息也不会被重新读取。
我希望,如果我没有确认该消息,那么当服务器被终止并重新启动时,应该重新读取该消息。我错过了什么?

@Bean
@Autowired
public StreamMessageListenerContainer eventStreamPersistenceListenerContainerTwo(RedisConnectionFactory streamRedisConnectionFactory, RedisTemplate streamRedisTemplate) {

        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                        .builder().pollTimeout(Duration.ofMillis(100)).build();

        StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(streamRedisConnectionFactory,
                        containerOptions);

        container.receive(Consumer.from("my-group", "my-consumer"),
                        StreamOffset.create("event-stream", ReadOffset.latest()),
                        message -> {
                                System.out.println("MessageId: " + message.getId());
                                System.out.println("Stream: " + message.getStream());
                                System.out.println("Body: " + message.getValue());
                                //streamRedisTemplate.opsForStream().acknowledge("my-group", message);
                        });

        container.start();

        return container;
}
ftf50wuq

ftf50wuq1#

在阅读了redis关于流是如何工作的文档之后,我想到了以下方法来为消费者自动处理任何未确认但以前已传递的消息:

// Check for any previously unacknowledged messages that were delivered to this consumer.
log.info("STREAM - Checking for previously unacknowledged messages for " + this.getClass().getSimpleName() + " event stream listener.");
String offset = "0";
while ((offset = processUnacknowledgedMessage(offset)) != null) {
        log.info("STREAM - Finished processing one unacknowledged message for " + this.getClass().getSimpleName() + " event stream listener: " + offset);
}
log.info("STREAM - Finished checking for previously unacknowledged messages for " + this.getClass().getSimpleName() + " event stream listener.");

以及处理消息的方法:

/**
 * Processes, and acknowledges the next previously delivered message, beginning
 * at the given message id offset.
 *
 * @param offset The last read message id offset.
 * @return The message that was just processed, or null if there are no more messages.
 */
public String processUnacknowledgedMessage(String offset) {
        List<MapRecord> messages = streamRedisTemplate.opsForStream().read(Consumer.from(groupName(), consumerName()),
                        StreamReadOptions.empty().noack().count(1),
                        StreamOffset.create(streamKey(), ReadOffset.from(offset)));
        String lastMessageId = null;
        for (MapRecord message : messages) {
                if (log.isDebugEnabled()) log.debug(String.format("STREAM - Processing event(%s) from stream(%s) during startup: %s", message.getId(), message.getStream(), message.getValue()));
                processRecord(message);
                if (log.isDebugEnabled()) log.debug(String.format("STREAM - Finished processing event(%s) from stream(%s) during startup.", message.getId(), message.getStream()));
                streamRedisTemplate.opsForStream().acknowledge(groupName(), message);
                lastMessageId = message.getId().getValue();
        }
        return lastMessageId;
}

相关问题