spring amqp-重新创建已删除队列

eoxn13cs  于 2021-07-13  发布在  Java
关注(0)|答案(1)|浏览(374)

因此,我将spring amqp用于以下配置:

ConnectionFactory factory = new ConnectionFactory();
factory.setAutomaticRecoveryEnabled(false);
CachingConnectionFactory ccf = new CachingConnectionFactory(factory);
ccf.setAddresses(addresses);
...

我正在使用amqp admin手动创建队列。在一些更新的库更新之前,使用此代码一切正常:

private void recreateContainer() {
    // if we are not already in recreation process
    if (this.recreatingContainer.compareAndSet(false, true)) {
      ExecutorService executor = Executors.newSingleThreadExecutor();
      executor.execute(
          () -> {
            int attempt = 0;
            while (this.listenerContainer.isRunning() && attempt < MAX_STOP_WAIT_ATTEMPTS) {
              try {
                attempt++;
                Thread.sleep(100);
              } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
              }
            }
            if (attempt == MAX_STOP_WAIT_ATTEMPTS) {
              logger.error("Container took too long to stop");
            }
            logger.info(String.format("Recreating listener container after %s attempts", attempt));
            startNewContainer();
            this.recreatingContainer.set(false);
          });
    }
  }

  @Override
  public void onApplicationEvent(ListenerContainerConsumerFailedEvent event) {
    Object source = event.getSource();
    Assert.isInstanceOf(
        MessageListenerContainer.class,
        source,
        "source must be instance of MessageListenerContainer");
    MessageListenerContainer listenerContainer = (MessageListenerContainer) source;
    // if fatal and this state model is responsible for this listenerContainer
    if (event.isFatal() && this.listenerContainer == listenerContainer) {
      recreateContainer();
    }
  }

看来这是从 ListenerContainerConsumerFailedEventListenerContainerConsumerTerminatedEvent 现在我的代码不工作了。
如果我更新了异常类型,队列将被重新创建,但我在停止listenercontainer时遇到问题。

ERROR [2021-03-08T16:52:10.291+01:00] lambda$recreateContainer$0: Container took too long to stop
INFO  [2021-03-08T16:52:10.291+01:00] lambda$recreateContainer$0: Recreating listener container after 100 attempts

现在,我得到了10个监听器,而不是5个监听器(因此,似乎监听器在重新创建队列后设法恢复,但在状态队列不存在时,我无法停止容器)。
任何帮助都将不胜感激。

oalqel3c

oalqel3c1#

似乎这段代码和 DirectMessageListenerContainer ```
private void recreateContainer() {
// if we are not already in recreation process
if (this.recreatingContainer.compareAndSet(false, true)) {
this.listenerContainer.stop();
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(
() -> {
int attempt = 0;
while (this.listenerContainer.isRunning() && attempt < MAX_STOP_WAIT_ATTEMPTS) {
try {
attempt++;
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
if (attempt == MAX_STOP_WAIT_ATTEMPTS) {
log.error("Container took too long to stop");
}
log.info(String.format("Recreating listener container after %s attempts", attempt));
startNewContainer();
this.recreatingContainer.set(false);
});
}
}

@Override
public void onApplicationEvent(ListenerContainerConsumerTerminatedEvent event) {
Object source = event.getSource();
Assert.isInstanceOf(
MessageListenerContainer.class,
source,
"source must be instance of MessageListenerContainer");
MessageListenerContainer sourceContainer = (MessageListenerContainer) source;
// if this state model is responsible for this listenerContainer
if (this.listenerContainer == sourceContainer) {
recreateContainer();
}
}

所以,我不得不用 `ListenerContainerConsumerTerminatedEvent` ,删除支票 `event.isFatal` 并手动停止集装箱娱乐 `this.listenerContainer.stop();` .

相关问题