java—如何知道消息是否已确认/未确认?

wsxa1bj1  于 2021-07-12  发布在  Java
关注(0)|答案(1)|浏览(524)

我正在尝试使用rabbitmq和spring boot来知道消息何时被接受(ack)或不接受(nack)。
我想将消息发送到队列(通过exchange),并检查队列是否已接受该消息。实际上,我想发送到两个不同的队列,但这并不重要,我假设它是否对其中一个有效,对另一个也有效。
所以我试过用 CorrelationData :

  1. public boolean sendMessage(...) {
  2. CorrelationData cd = new CorrelationData();
  3. this.rabbitTemplate.convertAndSend(exchange, routingKey, message, cd);
  4. try {
  5. return cd.getFuture().get(3, TimeUnit.SECONDS).isAck();
  6. } catch (InterruptedException | ExecutionException | TimeoutException e ) {
  7. e.printStackTrace();
  8. return false;
  9. }
  10. }

线路 cd.getFuture().get(3, TimeUnit.SECONDS).isAck() 应该得到 false is值尚未 ack 我想是排队吧。但这永远是真的,即使 routingKey 不存在。
所以我假设这段代码正在检查消息是否已发送到 exchange 以及 exchange 说“是的,我收到了消息,它还没有被路由,但我已经收到了”。
所以,我在rabbit/spring文档中寻找了其他方法,但我没有找到方法。
再解释一下,我想说的是:
我收到了一条消息。必须将此消息发送到其他队列/exchange,但在其他两个队列确认此消息之前,不能将其从当前队列中删除(即已确认) ack .
我有手动确认,作为一个小伪代码,我有:

  1. @RabbitListener(queues = {queue})
  2. public void receiveMessageFromDirect(Message message, Channel channel,
  3. @Header(AmqpHeaders.DELIVERY_TAG) long tag){
  4. boolean sendQueue1 = sendMessage(...);
  5. boolean sendQueue2 = sendMessage(...);
  6. if(sendQueue1 && sendQueue2){
  7. //both messages has been readed; now I can ack this message
  8. channel.basicAck(tag, false);
  9. }else{
  10. //nacked; I can't remove the message util both queue ack the message
  11. channel.basicNack(tag,false,true);
  12. }

我测试了这个结构,即使队列不存在,也测试了值 sendQueue1 以及 sendQueue2 都是真的。

1sbrub3j

1sbrub3j1#

确认为真;即使是无法表达的信息(我不完全清楚为什么)。
您需要启用返回的消息(并在 CorrelationData 未来完成后- correlationData.getReturnedMessage() ). 如果不为null,则消息无法路由到任何队列。
只有在代理程序中存在错误,或者使用队列时,才能获得nacks x-max-length 和溢出行为 reject-publish .

相关问题