activemq:为什么我的消息会卡在storequeuecursor中?

uqcuzwp8  于 2021-07-06  发布在  Java
关注(0)|答案(0)|浏览(243)

我发现activemq有一个问题,很难精确定位,甚至很难重现,很难提出一个具体的问题。请容忍我。
基本上我有:

producer (prioritized messages) -> queue -> consumer

通常,队列中有几个100k消息,每当优先级较高的消息到达时,都会首先使用它们。
在星号对齐并且写入队列的高优先级消息没有被消费之前,这种方法可以正常工作。至少在我打电话之前 Queue.removeMatchingMessages(String selector) 从队列中删除消息-哪个和多少并不重要。
幸运的是,我找到了一个很好的指示。
在我们的ui中可以看到,我提交了444条优先级(3)高于其他(1)的消息,但它们没有被消费:

用调试器检查队列,我发现 StoreQueueCursor.pendingCount is 444:

如果我再提交72条消息,则挂起计数为516(444+72):

然后我用 Queue.removeMatchingMessages(String selector) , StoreQueueCursor.pendingCount 变为0:

我的444条信息突然被消耗掉了:

所以我现在能问的最好的问题是:
你的目的是什么 StoreQueueCursor 它是如何导致我的信息被消费的呢?或者更确切地说:为什么那些消息没有写入队列并准备好被消费呢?
非常感谢您的帮助。
我在用 org.apache.activemq:activemq-broker:5.15.12 (通过Spring套2.3.1.释放)。

更新

有趣的是,在“快乐案例”中,我所有的高优先级消息都得到了应有的处理 pendingCount 远高于0:

更新#2

在activemq中,如何支持优先级队列?上面写着:
由于消息游标(和客户端)实现了严格的优先级排序,因此,如果消息调度可以从缓存中进行,而不必命中磁盘(即,您的使用者足够快,可以跟上生产者),则可以观察到严格的优先级排序,或者如果您使用的是不必刷新到磁盘的非持久性消息(使用filependingmessagecursor)。但是,一旦遇到消费者速度慢,或者生产者速度快得多的情况,您就会发现缓存将被填满(可能是优先级较低的消息),而优先级较高的消息会被卡在磁盘上,直到它们被调入时才可用。在这种情况下,您可以决定权衡优化的消息调度以实现优先级强制。您可以禁用缓存、消息过期检查,并将使用者预取降低到1,以确保在低优先级消息之前从存储中获取高优先级消息
所以我试着像这样禁用缓存(顺便说一下,我已经 jms.prefetchPolicy.all=0 设置):

PolicyEntry policyEntry = new PolicyEntry();
policyEntry.setQueue(JmsQueueNames.TASK_QUEUE_PREFIX + ".*");
policyEntry.setDeadLetterStrategy(taskDeadLetterStrategy);
policyEntry.setPrioritizedMessages(true);
policyEntry.setUseCache(false);
policyEntry.setExpireMessagesPeriod(0);

现在, useCache 是假的但是 cacheEnabled 是真的:

但同样的行为也可以观察到。
另外,我总是关闭代理持久性,因此我不确定上述情况是否适用:

@Bean
public BrokerService broker(ActiveMQProperties properties, DispatcherProperties dispatcherProperties) throws Exception {
  BrokerService brokerService = new BrokerService();
  brokerService.setPersistent(false);
  brokerService.getProducerSystemUsage().getMemoryUsage().setLimit(dispatcherProperties.getActiveMq().getMemoryLimit());
  brokerService.addConnector(properties.getBrokerUrl());
  brokerService.setPlugins(getPluginsToLoad());
  brokerService.setDestinationPolicy(policyMap());
  return brokerService;
}

来自jmx的信息:

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题