我使用下面的代码使用spring数据redis使用者组来使用redis流,但是即使我已经注解掉了acknowledge命令,在服务器重新启动之后,我的消息也不会被重新读取。
我希望,如果我没有确认该消息,那么当服务器被终止并重新启动时,应该重新读取该消息。我错过了什么?
@Bean
@Autowired
public StreamMessageListenerContainer eventStreamPersistenceListenerContainerTwo(RedisConnectionFactory streamRedisConnectionFactory, RedisTemplate streamRedisTemplate) {
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder().pollTimeout(Duration.ofMillis(100)).build();
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(streamRedisConnectionFactory,
containerOptions);
container.receive(Consumer.from("my-group", "my-consumer"),
StreamOffset.create("event-stream", ReadOffset.latest()),
message -> {
System.out.println("MessageId: " + message.getId());
System.out.println("Stream: " + message.getStream());
System.out.println("Body: " + message.getValue());
//streamRedisTemplate.opsForStream().acknowledge("my-group", message);
});
container.start();
return container;
}
1条答案
按热度按时间ftf50wuq1#
在阅读了redis关于流是如何工作的文档之后,我想到了以下方法来为消费者自动处理任何未确认但以前已传递的消息:
以及处理消息的方法: