jmsactivemq主题慢速使用者

svdrlsy4  于 2021-07-13  发布在  Java
关注(0)|答案(1)|浏览(367)

我使用的是一个持久的主题,其中生产者使用以下策略发布事件:

<bean id="jmsTemplateESB"   class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory"     ref="cachedJmsConnectionFactory" />
    <property name="defaultDestination" ref="activeMQTopic" />
    <!-- Value = javax.jms.DeliveryMode.PERSISTENT -->
    <property name="deliveryMode" value="2" />
    <!-- Value = javax.jms.Session.AUTO_ACKNOWLEDGE -->
    <property name="sessionAcknowledgeMode" value="1" />
    <!-- Needs to be true for the deliveryMode to work -->

    <property name="explicitQosEnabled" value="true" />
    </bean>

我正在为消费者使用以下设置:

public static void listenOnTopic(String topicName, MessageListener listener) 
   throws Exception 
   {
   ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(bindAddress);
   Connection con = factory.createConnection();

   con.setClientID("Consumer");
   Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
   Topic topic = session.createTopic(topicName);
   TopicSubscriber subscriber = session.createDurableSubscriber(topic, listener.getClass().getName());

   subscriber.setMessageListener(listener);

   con.start();
  }

使用下面的侦听器

public class ActiveMQMessageListener implements MessageListener
{
 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQMessageListener.class);

 @Autowired
 @Qualifier("jmsEventOutPutChannel")
 MessageChannel outputChannel;

 @Override
 public void onMessage(Message message) {
    try {   
        BytesMessage bytesMessage= (BytesMessage) message;
        byte[] data = new byte[(int)bytesMessage.getBodyLength()];
        bytesMessage.readBytes(data);
        org.springframework.integration.Message<byte[]> outputMessage = MessageBuilder.withPayload(data).build();
        outputChannel.send(outputMessage);
    } catch (JMSException e) {
        e.printStackTrace();
        LOG.error("Error while retrieving events from ActiveMQ ",e);
    }
 }
}

以及输出通道的跟随Spring设置

<bean id="callerBlockPolicy" class="org.springframework.integration.util.CallerBlocksPolicy">
    <constructor-arg type="long" value="10000"></constructor-arg>
</bean>

<bean id="jmsListnerTaskExecutor"
    class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    <property name="corePoolSize" value="${CORE_POOL_SIZE}"></property>
    <property name="maxPoolSize" value="${THREAD_POOL_SIZE_JMS_LISTENER}"></property>
    <property name="queueCapacity" value="${QUEUE_SIZE_JMS_LISTENER}"></property>
    <property name="rejectedExecutionHandler" ref="callerBlockPolicy"></property>
    <property name="waitForTasksToCompleteOnShutdown" value="true"></property>
</bean>

<int:channel id="jmsEventOutPutChannel">
    <int:dispatcher task-executor="jmsListnerTaskExecutor" />
</int:channel>

这个消费代码太慢,我们无法从主题中高速检索消息。
实际上,如果没有图片中的“jmseventoutputchannel”,我将获得大约9500qps的速率,但是如果图片中的“jmseventoutputchannel”,我们将获得大约150qps的速率。
有人能告诉我这个代码有什么问题吗?
我的“jmseventoutputchannel”通道代码是否也会影响activemq的使用率?

xtupzzrd

xtupzzrd1#

真正的问题不是你的消费代码,而是在把消息发送到输出通道时出了问题。
关注这里,看看为什么消息要花这么长时间才能写入activemq。首先,我会尝试让它不持久(但仍然持久),看看它的表现是否不同。可能是activemq服务器配置不正确,并且写入后端存储效率低下(可能是kahadb跟不上?)
有没有可能生产者正在为发送的每条消息创建一个连接,而开销会让你丧命?
你可能会发布你的activemqurl,但不知道你添加了哪些参数可能会产生影响。但看到它如此堕落显然是不好的。

相关问题