camel rabbitmq并发使用者在关闭时丢弃消息

js81xvg6  于 2021-07-03  发布在  Java
关注(0)|答案(0)|浏览(295)

我正在连接一个rabbitmq集群,队列中有50条消息等待运行camel的java应用程序处理。我的 Camel 是 rabbitmq://localhost:5672/PrintRequest?autoAck=false&autoDelete=false&automaticRecoveryEnabled=true&concurrentConsumers=5&durable=true&exchangeType=fanout&password=rabbitPass&prefetchCount=5&prefetchEnabled=true&queue=server-1-PrintRequest&threadPoolSize=5&topologyRecoveryEnabled=true&username=rabbitUser 理论上,它应该在5个线程中提取队列中的5条消息并对其进行处理。处理每个事件的时间平均需要几秒钟。
当我在处理事件时优雅地停止应用程序时,我希望看到它将尚未完成处理的事件放回队列(从unacked状态移到ready状态)。但是,应用程序会丢弃这些事件和更多事件,有时甚至会在关机期间丢弃所有50个事件,并出现以下错误:

java.util.concurrent.RejectedExecutionException: Run not allowed as ShutdownStrategy is forcing shutting down, will reject executing exchange: Exchange[]
    at org.apache.camel.processor.CamelInternalProcessor.continueProcessing(CamelInternalProcessor.java:266)
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:138)
    at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:77)
    at org.apache.camel.support.AsyncProcessorSupport.process(AsyncProcessorSupport.java:40)
    at org.apache.camel.component.rabbitmq.RabbitConsumer.doHandleDelivery(RabbitConsumer.java:104)
    at org.apache.camel.component.rabbitmq.RabbitConsumer.handleDelivery(RabbitConsumer.java:79)
    at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
    at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

如果我将并发使用者减少到一个,我将得到“将未处理的消息放回就绪状态”的预期行为,但是我要求这是并发的,以跟上事件进入的速率。
我使用的是camel的3.6.0版本,并使用默认的关闭策略。异常在我启动进程3秒后出现。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题