Camel Spring remoting with ActiveMQ Artemis Proton/Qpid客户端-挂起发送消息

mfuanj7w  于 2023-10-15  发布在  Spring
关注(0)|答案(1)|浏览(103)
  • 当我使用CamelSpring远程配置发送一些消息时,生产者和消费者都运行在不同的JVM中。
  • 使用Apache ActiveMQ Artemis 2.14.0版本
  • camel(2.20.0),qpid(0.54.0),pooled-jms(1.1.1)的版本

我正在使用LoadMessageSupport类推送消息,我看到 Camel 路由被调用,并在调试日志消息下面。
我注意到ActiveMQ Artemis控制台中启用了一个生产者会话。
任何线索,如何调试这个或什么可能导致这个问题。
有一些Netty相关的调试错误,我安全地忽略了。

  1. ...
  2. DEBUG [main] (DefaultManagementAgent.java:470) - Registered MBean with ObjectName: org.apache.camel:context=camel,type=components,name="bean"
  3. DEBUG [main] (DefaultComponent.java:266) - Cannot resolve property placeholders on component: org.apache.camel.component.bean.BeanComponent@cda0432 as PropertiesComponent is not in use
  4. DEBUG [main] (AbstractAutowireCapableBeanFactory.java:448) - Creating instance of bean 'org.apache.camel.component.jackson.converter.JacksonTypeConverters'
  5. DEBUG [main] (AbstractAutowireCapableBeanFactory.java:484) - Finished creating instance of bean 'org.apache.camel.component.jackson.converter.JacksonTypeConverters'
  6. INFO [main] (CamelLogger.java:159) - ID-local-vm-1624040900482-0-1 >>> (processMessage) from(direct://proxy-msg-handler) --> log[Log message on incoming message with body] <<< Pattern:InOnly, Headers:{breadcrumbId=ID-local-vm-1624040900482-0-1}, BodyType:org.apache.camel.component.bean.BeanInvocation, Body:BeanInvocation public abstract void com.myexample.MessageHandler.processMessage(com.myexample.MessageType,java.lang.String) with [ITEM_DESCRIPTION, {"info": " my name"}]]
  7. DEBUG [main] (CamelLogger.java:153) - Log message on incoming message with body
  8. INFO [main] (CamelLogger.java:159) - ID-local-vm-1624040900482-0-1 >>> (SubmitNotificationEvent) log[Log message on incoming message with body] --> amqpcomponent://queue:message.queue <<< Pattern:InOnly, Headers:{breadcrumbId=ID-local-vm-1624040900482-0-1}, BodyType:org.apache.camel.component.bean.BeanInvocation, Body:BeanInvocation public abstract void com.myexample.MessageHandler.processMessage(com.myexample.MessageType,java.lang.String) with [ITEM_DESCRIPTION, {"info": " my name"}]]
  9. DEBUG [main] (SendProcessor.java:147) - >>>> service-event-queue://queue:message.queue Exchange[ID-local-vm-1624040900482-0-1]
  10. DEBUG [main] (InternalLoggerFactory.java:45) - Using SLF4J as the default logging framework
  11. ...
  12. DEBUG [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpConnectionBuilder.java:84) - AmqpConnection { ID:6d0c8673-6a92-401d-a239-12ec696fc9d3:1 } is now open:
  13. INFO [AmqpProvider :(1):[amqp://localhost:5672]] (JmsConnection.java:1339) - Connection ID:6d0c8673-6a92-401d-a239-12ec696fc9d3:1 connected to server: amqp://localhost:5672
  14. DEBUG [main] (JmsTemplate.java:492) - Executing callback on JMS Session: JmsPoolSession { org.apache.qpid.jms.JmsSession@7fd26ad8 }
  15. DEBUG [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpProducerBuilder.java:68) - Creating AmqpFixedProducer for: null
  16. DEBUG [main] (JmsConfiguration.java:622) - Sending JMS message to: message.queue with message: JmsObjectMessageFacade

在启用TRACE级别后,注意到下面的消息

  1. DEBUG [main] (JmsConfiguration.java:622) - Sending JMS message to: message.queue with message: JmsObjectMessageFacade { org.apache.qpid.jms.provider.amqp.message.AmqpJmsObjectMessageFacade@36cc9385 }
  2. TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpFixedProducer.java:100) - Holding Message send until credit is available.
  3. TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpProvider.java:1625) - IdleTimeoutCheck rescheduling with delay: 15000
  4. TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (NettyTcpTransport.java:560) - New incoming data read: PooledUnsafeDirectByteBuf(ridx: 0, widx: 8, cap: 65536)
  5. TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpProtocolTracer.java:49) - [1673389762:0] RECV: Empty Frame
  6. TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpProtocolTracer.java:54) - [1673389762:0] SENT: Empty Frame
  7. TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (NettyTcpTransport.java:259) - Attempted write of buffer: PooledUnsafeDirectByteBuf(ridx: 0, widx: 8, cap: 8/8)
  8. TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (NettyTcpTransport.java:273) - Attempted flush of pending writes
  9. TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpProvider.java:1625) - IdleTimeoutCheck rescheduling with delay: 15000
  10. TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (NettyTcpTransport.java:560) - New incoming data read: PooledUnsafeDirectByteBuf(ridx: 0, widx: 8, cap: 65536)
  11. TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpProtocolTracer.java:49) - [1673389762:0] RECV: Empty Frame
  12. TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpProtocolTracer.java:54) - [1673389762:0] SENT: Empty Frame
  13. TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (NettyTcpTransport.java:259) - Attempted write of buffer: PooledUnsafeDirectByteBuf(ridx: 0, widx: 8, cap: 8/8)
  14. TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (NettyTcpTransport.java:273) - Attempted flush of pending writes
  15. TRACE [AmqpProvider :(1):[amqp://localhost:5672]] (AmqpProvider.java:1625) - IdleTimeoutCheck rescheduling with delay: 15000
  • 下面是我用来从java类发送消息的上下文xml。
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:camel="http://camel.apache.org/schema/spring"
  3. xmlns:util="http://www.springframework.org/schema/util"
  4. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  5. http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
  6. http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
  7. http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
  8. http://www.springframework.org/schema/util ttp://www.springframework.org/schema/util/spring-util.xsd">
  9. <bean id="jmsConnectionFactory" class="org.apache.qpid.jms.JmsConnectionFactory">
  10. <property name="remoteURI" value="amqp://localhost:5672?amqp.traceFrames=true"/>
  11. </bean>
  12. <bean id="jpcf" class="org.messaginghub.pooled.jms.JmsPoolConnectionFactory" init-method="start" destroy-method="stop" >
  13. <property name="maxConnections" value="3" />
  14. <property name="connectionFactory" ref="jmsConnectionFactory" />
  15. </bean>
  16. <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
  17. <property name="connectionFactory" ref="jpcf" />
  18. <property name="concurrentConsumers" value="3" />
  19. </bean>
  20. <bean id="amqpcomponent" class="org.apache.camel.component.amqp.AMQPComponent">
  21. <property name="configuration" ref="jmsConfig" />
  22. </bean>
  23. <!-- Camel Spring Remoting Interface -->
  24. <camel:proxy id="proxyObject" binding="false" serviceUrl="direct:proxy-msg-handler" serviceInterface="com.myexample.MessageHandler"/>
  25. <!-- Bean that initialize the Spring Remoting for handling message -->
  26. <bean id="BeanProxy" class="com.myexample.MessageProducer">
  27. <property name="messageHandler" ref="proxyObject"/>
  28. </bean>
  29. <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring" autoStartup="true" trace="true">
  30. <camel:route autoStartup="true" id="processMessage">
  31. <camel:from uri="direct:proxy-msg-handler"/>
  32. <camel:log message="Log incoming message" logName="Incoming" loggingLevel="DEBUG"/>
  33. <camel:inOnly uri="amqpcomponent:queue:message.queue"/>
  34. </camel:route>
  35. </camelContext>
  36. </beans>
  • java类运行上下文,用于调用远程spring bean方法。
  • 使用下面的java类将消息推送到ActiveMQ Artemis队列
  1. package com.myexample;
  2. public class LoadMessageSupport {
  3. public static void main(String ...strings) {
  4. ApplicationContext appContext =null;
  5. try {
  6. appContext = new ClassPathXmlApplicationContext("file:/paht/to/context/message-handler-context.xml");
  7. MessageProducer messageProducer = appContext.getBean(MessageProducer.class);
  8. message = "{ \"itemDesc\" : \"test description\" }" ;
  9. System.out.println(message);
  10. messageProducer.sendMessage(MessageType.ITEM_DESC, message);
  11. // enum messagetype already defined within project
  12. //System.exit(0);
  13. }catch(Exception exe) {
  14. System.out.println("Something wrong... ");
  15. exe.printStackTrace();
  16. }finally {
  17. if(camelContext!=null) {
  18. System.out.println("camel context stopped...");
  19. camelContext.stop();
  20. }
  21. }
  22. }
  23. }
  • 讯息接收者类别
  1. @InOnly
  2. public interface MessageHandler{
  3. public processMessage(MessageType type, Order order);
  4. public processMessage(MessageType type, String message); // trying to invoke this message
  5. }
  • 生产者类
  1. public class MessageProducer{
  2. // using the proxy object within the producer object
  3. // this will invoke the spring bean using remote (rmi)
  4. private MessageHandler messageHandler;
  5. protected MessageHandler getMessageHandler() {
  6. return this.messageHandler;
  7. }
  8. public void setMessageHandler(MessageHandler messageHandler) {
  9. this.messageHandler = messageHandler;
  10. }
  11. //constructor
  12. public MessageProducer() {}
  13. public void sendMessage(MessageType type, Order order ){
  14. getMessageHandler().processMessage(type,order);
  15. ​}
  16. public void sendMessage(MessageType type, String message ){
  17. getMessageHandler().processMessage(type,message);
  18. ​}
  • 消息接收器
  1. public class MessageReceiver implements MessageHandler {
  2. @Handler
  3. public void processMessage(MessageType type, Order order){
  4. System.out.println(" received type and ORDER info ...");
  5. // invoke methods for logical processing
  6. }
  7. @Handler
  8. public void processMessage(MessageType type, String message){
  9. System.out.println(" received type and MESSAGE info for procesing...");
  10. // invoke methods for logical processing
  11. }
  12. }
gev0vcfq

gev0vcfq1#

当我之前尝试时,似乎我的VM没有足够的内存。
free -h表示只剩下500MB。
在重新启动VM后,消息现在被发送到Artemis borker。

相关问题