我发现activemq有一个问题,很难精确定位,甚至很难重现,很难提出一个具体的问题。请容忍我。
基本上我有:
producer (prioritized messages) -> queue -> consumer
通常,队列中有几个100k消息,每当优先级较高的消息到达时,都会首先使用它们。
在星号对齐并且写入队列的高优先级消息没有被消费之前,这种方法可以正常工作。至少在我打电话之前 Queue.removeMatchingMessages(String selector)
从队列中删除消息-哪个和多少并不重要。
幸运的是,我找到了一个很好的指示。
在我们的ui中可以看到,我提交了444条优先级(3)高于其他(1)的消息,但它们没有被消费:
用调试器检查队列,我发现 StoreQueueCursor.pendingCount
is 444:
如果我再提交72条消息,则挂起计数为516(444+72):
然后我用 Queue.removeMatchingMessages(String selector)
, StoreQueueCursor.pendingCount
变为0:
我的444条信息突然被消耗掉了:
所以我现在能问的最好的问题是:
你的目的是什么 StoreQueueCursor
它是如何导致我的信息被消费的呢?或者更确切地说:为什么那些消息没有写入队列并准备好被消费呢?
非常感谢您的帮助。
我在用 org.apache.activemq:activemq-broker:5.15.12
(通过Spring套2.3.1.释放)。
更新
有趣的是,在“快乐案例”中,我所有的高优先级消息都得到了应有的处理 pendingCount
远高于0:
更新#2
在activemq中,如何支持优先级队列?上面写着:
由于消息游标(和客户端)实现了严格的优先级排序,因此,如果消息调度可以从缓存中进行,而不必命中磁盘(即,您的使用者足够快,可以跟上生产者),则可以观察到严格的优先级排序,或者如果您使用的是不必刷新到磁盘的非持久性消息(使用filependingmessagecursor)。但是,一旦遇到消费者速度慢,或者生产者速度快得多的情况,您就会发现缓存将被填满(可能是优先级较低的消息),而优先级较高的消息会被卡在磁盘上,直到它们被调入时才可用。在这种情况下,您可以决定权衡优化的消息调度以实现优先级强制。您可以禁用缓存、消息过期检查,并将使用者预取降低到1,以确保在低优先级消息之前从存储中获取高优先级消息
所以我试着像这样禁用缓存(顺便说一下,我已经 jms.prefetchPolicy.all=0
设置):
PolicyEntry policyEntry = new PolicyEntry();
policyEntry.setQueue(JmsQueueNames.TASK_QUEUE_PREFIX + ".*");
policyEntry.setDeadLetterStrategy(taskDeadLetterStrategy);
policyEntry.setPrioritizedMessages(true);
policyEntry.setUseCache(false);
policyEntry.setExpireMessagesPeriod(0);
现在, useCache
是假的但是 cacheEnabled
是真的:
但同样的行为也可以观察到。
另外,我总是关闭代理持久性,因此我不确定上述情况是否适用:
@Bean
public BrokerService broker(ActiveMQProperties properties, DispatcherProperties dispatcherProperties) throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.setPersistent(false);
brokerService.getProducerSystemUsage().getMemoryUsage().setLimit(dispatcherProperties.getActiveMq().getMemoryLimit());
brokerService.addConnector(properties.getBrokerUrl());
brokerService.setPlugins(getPluginsToLoad());
brokerService.setDestinationPolicy(policyMap());
return brokerService;
}
来自jmx的信息:
暂无答案!
目前还没有任何答案,快来回答吧!