我们的环境由3个jboss服务器(门户、jms、协调)组成。
1.协调服务器托管具有Route consumpting from队列(SLAQueue)的camel路由。
- JMS服务器托管了所有队列。
1.最近,我们发现了一个错误,即JMS服务器上托管的TaskQueue中的一些消息未被传递到Portal服务器上的MDB。由于某种原因,这些消息被阻塞,当我们重新启动JMS服务器时,阻塞的消息被传递。
为了进行调查,我们在“org.apache.activemq.artemis”上启用了TRACE级别的日志记录。我们注意到在我们的camel jms组件和JMS服务器之间有很多聊天。下面列出了一个聊天的示例,类似这样的日志每1秒就会被写入一次。
问题: - camel jms组件从队列中获取消息的机制是什么?JMS组件是每秒轮询一次队列(pull)?还是消息到达队列时通知JMS组件?
1.该机制与J2EE消息驱动bean不同吗?消息到达队列时,MDB会得到通知。
1.根据下面的聊天,我认为这是轮询。如果是轮询,可以配置轮询窗口吗?我已经尝试了receiveTimeout选项,但没有成功。
camel JMS组件和JMS服务器之间日志中的聊天示例:
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.ra] (Camel (camelContextReconciliation) thread #0 - JmsConsumer[SLAQueue]) createdConsumer ActiveMQSession->ClientSessionImpl [name=eebaf2ec-47b3-11ec-ad21-0242ac110003, username=null, closed=false, factory = org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl@413c962e, metaData=(jms-session=,)]@72d15942 consumer=org.apache.activemq.artemis.ra.ActiveMQRAMessageConsumer@1dd2e8b
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.ra] (Camel (camelContextReconciliation) thread #0 - JmsConsumer[SLAQueue]) addConsumer(org.apache.activemq.artemis.ra.ActiveMQRAMessageConsumer@1dd2e8b)
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl] (Thread-206 (ActiveMQ-remoting-threads-ActiveMQServerImpl::serverUUID=b5ba8675-1a62-11ec-b397-0242ac110002-1886035738)) RemotingConnectionID=eeb5e9d6-47b3-11ec-ad21-0242ac110003 handling packet PACKET(SessionConsumerFlowCreditMessage)[type=70, channelID=12, responseAsync=false, requiresResponse=false, correlationID=-1, packetObject=SessionConsumerFlowCreditMessage, consumerID=5086, credits=1048576]
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.ra] (Camel (camelContextReconciliation) thread #0 - JmsConsumer[SLAQueue]) unlock()
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnection] (Thread-206 (ActiveMQ-remoting-threads-ActiveMQServerImpl::serverUUID=b5ba8675-1a62-11ec-b397-0242ac110002-1886035738)) InVMConnection [serverID=0, id=eeb5e9d6-47b3-11ec-ad21-0242ac110003]::packet sent done
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.ra] (Camel (camelContextReconciliation) thread #0 - JmsConsumer[SLAQueue]) unlock()
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.ra] (Camel (camelContextReconciliation) thread #0 - JmsConsumer[SLAQueue]) lock()
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.ra] (Camel (camelContextReconciliation) thread #0 - JmsConsumer[SLAQueue]) tryLock()
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.ra] (Camel (camelContextReconciliation) thread #0 - JmsConsumer[SLAQueue]) getUseTryLock()
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.ra] (Camel (camelContextReconciliation) thread #0 - JmsConsumer[SLAQueue]) getUseTryLock()
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.ra] (Camel (camelContextReconciliation) thread #0 - JmsConsumer[SLAQueue]) lock()
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.ra] (Camel (camelContextReconciliation) thread #0 - JmsConsumer[SLAQueue]) receive org.apache.activemq.artemis.ra.ActiveMQRAMessageConsumer@1dd2e8b timeout=120000
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.ra] (Camel (camelContextReconciliation) thread #0 - JmsConsumer[SLAQueue]) checkState()
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl] (Camel (camelContextReconciliation) thread #0 - JmsConsumer[SLAQueue]) org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl@5e7ed34b{consumerContext=ActiveMQConsumerContext{id=5086}, queueName=jms.queue.SLAQueue}:: receive(120000)
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl] (Camel (camelContextReconciliation) thread #0 - JmsConsumer[SLAQueue]) org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl@5e7ed34b{consumerContext=ActiveMQConsumerContext{id=5086}, queueName=jms.queue.SLAQueue}::receive(120000, false)
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler] (Thread-1288 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl$6@775fa129)) ServerSessionPacketHandler::handlePacket,PACKET(SessionConsumerFlowCreditMessage)[type=70, channelID=12, responseAsync=false, requiresResponse=false, correlationID=-1, packetObject=SessionConsumerFlowCreditMessage, consumerID=5086, credits=1048576]
2021-11-17 16:39:24,612 DEBUG [org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl] (Thread-1288 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl$6@775fa129)) ServerConsumerImpl [id=5086, filter=null, binding=LocalQueueBinding [address=jms.queue.SLAQueue, queue=QueueImpl[name=jms.queue.SLAQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=b5ba8675-1a62-11ec-b397-0242ac110002], temp=false]@3b0e1e9f, filter=null, name=jms.queue.SLAQueue, clusterName=jms.queue.SLAQueueb5ba8675-1a62-11ec-b397-0242ac110002]]::FlowControl::Received 1048576 credits, previous value = 0 currentValue = 1048576
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl] (Thread-1288 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl$6@775fa129)) ServerConsumerImpl [id=5086, filter=null, binding=LocalQueueBinding [address=jms.queue.SLAQueue, queue=QueueImpl[name=jms.queue.SLAQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=b5ba8675-1a62-11ec-b397-0242ac110002], temp=false]@3b0e1e9f, filter=null, name=jms.queue.SLAQueue, clusterName=jms.queue.SLAQueueb5ba8675-1a62-11ec-b397-0242ac110002]]::calling promptDelivery from receiving credits
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler] (Thread-1288 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl$6@775fa129)) ServerSessionPacketHandler::scheduling response::null
2021-11-17 16:39:24,612 TRACE [org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler] (Thread-1288 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl$6@775fa129)) ServerSessionPacketHandler::regular response sent::null
1条答案
按热度按时间vdzxcuhz1#
是的,JMS使用者正在轮询。默认情况下,Camel在后台使用Spring的DefaultMessageListenerContainer。
Spring-Listener本身直接使用JMS API。
Spring使用1秒的默认超时来拉取消息。我不知道你是否真的可以用Camel的
receiveTimeout
选项来改变这一点。