使用redis流和spring数据获取挂起的消息

mnemlml8  于 2021-06-10  发布在  Redis
关注(0)|答案(1)|浏览(679)

我在spring boot应用程序中使用redis流。在调度程序中,我经常希望获取所有挂起的消息,检查它们已经处理了多长时间,并在必要时重新触发它们。
我的问题是现在我可以获得挂起的消息,但我不知道如何获得有效负载。
我的第一种方法是 pending 以及 range 操作。这里的缺点是totaldeliverycount没有随时间增加 range -所以我不能用range方法

  1. val pendingMessages = stringRedisTemplate.opsForStream<String, Any>().pending(redisStreamName, Consumer.from(redisConsumerGroup, instanceName))
  2. return pendingMessages.filter { pendingMessage ->
  3. if (pendingMessage.totalDeliveryCount < maxDeliveryAttempts && pendingMessage.elapsedTimeSinceLastDelivery > Duration.ofMillis(pendingTimeout.toLong())) {
  4. return@filter true
  5. } else {
  6. ...
  7. return@filter false
  8. }
  9. }.map { //map from PendingMessage::class to a MapRecord with the content
  10. val map = stringRedisTemplate.opsForStream().range(redisStreamName, Range.just(it.idAsString)) // does not increase totalDeliveryCount !!!
  11. if (map != null && map.size > 0) {
  12. return@map map[0]
  13. } else {
  14. return@map null
  15. }
  16. }.filterNotNull().toList()

我的第二种方法是 pending 以及 read 操作。对于读取操作,我可以用当前id指定一个偏移量。问题是,我只得到比指定id高的id。

  1. val pendingMessages = stringRedisTemplate.opsForStream().pending(redisStreamName, Consumer.from(redisConsumerGroup, instanceName))
  2. return pendingMessages.filter { pendingMessage ->
  3. if (pendingMessage.totalDeliveryCount < maxDeliveryAttempts && pendingMessage.elapsedTimeSinceLastDelivery > Duration.ofMillis(pendingTimeout.toLong())) {
  4. return@filter true
  5. } else {
  6. ...
  7. return@filter false
  8. }
  9. }.map { //map from PendingMessage::class to a MapRecord with the content
  10. val map = stringRedisTemplate.opsForStream<String, Any>()
  11. .read(it.consumer, StreamReadOptions.empty().count(1),
  12. StreamOffset.create(redisStreamName, ReadOffset.from(it.id)))
  13. if (map != null && map.size > 0 && map[0].id.value == it.idAsString) { // map[0].id.value == it.idAsString does not match
  14. return@map map[0]
  15. } else {
  16. return@map null
  17. }
  18. }.filterNotNull().toList()

所以当我使用 ReadOffset.from('1234-0') 我不明白你的意思 1234-0 但那消息之后的一切。有没有办法得到确切的信息,同时也尊重 totalDeliveryCount 以及 elapsedTimeSinceLastDelivery 统计的?
我使用的是springdataredis2.3.1.release

cgyqldqp

cgyqldqp1#

我现在正在使用以下解决方法,这对于大多数情况应该是好的:

  1. return if (id.sequence > 0) {
  2. "${id.timestamp}-${id.sequence - 1}"
  3. } else {
  4. "${id.timestamp - 1}-99999"
  5. }

它依赖于这样一个事实,即每毫秒插入的消息不超过99999条。

相关问题