我使用的是一个持久的主题,其中生产者使用以下策略发布事件:
<bean id="jmsTemplateESB" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="cachedJmsConnectionFactory" />
<property name="defaultDestination" ref="activeMQTopic" />
<!-- Value = javax.jms.DeliveryMode.PERSISTENT -->
<property name="deliveryMode" value="2" />
<!-- Value = javax.jms.Session.AUTO_ACKNOWLEDGE -->
<property name="sessionAcknowledgeMode" value="1" />
<!-- Needs to be true for the deliveryMode to work -->
<property name="explicitQosEnabled" value="true" />
</bean>
我正在为消费者使用以下设置:
public static void listenOnTopic(String topicName, MessageListener listener)
throws Exception
{
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(bindAddress);
Connection con = factory.createConnection();
con.setClientID("Consumer");
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(topicName);
TopicSubscriber subscriber = session.createDurableSubscriber(topic, listener.getClass().getName());
subscriber.setMessageListener(listener);
con.start();
}
使用下面的侦听器
public class ActiveMQMessageListener implements MessageListener
{
private static final Logger LOG = LoggerFactory.getLogger(ActiveMQMessageListener.class);
@Autowired
@Qualifier("jmsEventOutPutChannel")
MessageChannel outputChannel;
@Override
public void onMessage(Message message) {
try {
BytesMessage bytesMessage= (BytesMessage) message;
byte[] data = new byte[(int)bytesMessage.getBodyLength()];
bytesMessage.readBytes(data);
org.springframework.integration.Message<byte[]> outputMessage = MessageBuilder.withPayload(data).build();
outputChannel.send(outputMessage);
} catch (JMSException e) {
e.printStackTrace();
LOG.error("Error while retrieving events from ActiveMQ ",e);
}
}
}
以及输出通道的跟随Spring设置
<bean id="callerBlockPolicy" class="org.springframework.integration.util.CallerBlocksPolicy">
<constructor-arg type="long" value="10000"></constructor-arg>
</bean>
<bean id="jmsListnerTaskExecutor"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="${CORE_POOL_SIZE}"></property>
<property name="maxPoolSize" value="${THREAD_POOL_SIZE_JMS_LISTENER}"></property>
<property name="queueCapacity" value="${QUEUE_SIZE_JMS_LISTENER}"></property>
<property name="rejectedExecutionHandler" ref="callerBlockPolicy"></property>
<property name="waitForTasksToCompleteOnShutdown" value="true"></property>
</bean>
<int:channel id="jmsEventOutPutChannel">
<int:dispatcher task-executor="jmsListnerTaskExecutor" />
</int:channel>
这个消费代码太慢,我们无法从主题中高速检索消息。
实际上,如果没有图片中的“jmseventoutputchannel”,我将获得大约9500qps的速率,但是如果图片中的“jmseventoutputchannel”,我们将获得大约150qps的速率。
有人能告诉我这个代码有什么问题吗?
我的“jmseventoutputchannel”通道代码是否也会影响activemq的使用率?
1条答案
按热度按时间xtupzzrd1#
真正的问题不是你的消费代码,而是在把消息发送到输出通道时出了问题。
关注这里,看看为什么消息要花这么长时间才能写入activemq。首先,我会尝试让它不持久(但仍然持久),看看它的表现是否不同。可能是activemq服务器配置不正确,并且写入后端存储效率低下(可能是kahadb跟不上?)
有没有可能生产者正在为发送的每条消息创建一个连接,而开销会让你丧命?
你可能会发布你的activemqurl,但不知道你添加了哪些参数可能会产生影响。但看到它如此堕落显然是不好的。