从spring boot stomp连接到外部activemq

x33g5p2x  于 2021-09-13  发布在  Java
关注(0)|答案(0)|浏览(473)

spring boot应用程序仅在同一主机上使用stomp和activemq,而不使用容器,配置为:

@Configuration
@EnableWebSocketMessageBroker
public class MQConfig implements WebSocketMessageBrokerConfigurer {

    public final static String ORDER_STREAM = "/stream/newOrder/";

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry
                .addEndpoint("/api/streams")
                .setAllowedOrigins("*")
                .withSockJS();
    }

    @Override
    public void configureMessageBroker(final MessageBrokerRegistry config) {
        config.enableStompBrokerRelay("/stream")
                .setRelayHost("localhost")
                .setRelayPort(61613)
                .setClientLogin(mqClientLogin)
                .setClientPasscode(mqClientPasscode);
        config.setApplicationDestinationPrefixes("/api");
    }
}

和用例:

@Service
@RequiredArgsConstructor
public class StreamServiceImpl implements StreamService {

    private final SimpMessagingTemplate simpMessagingTemplate;

    @Override
    public void distributeNewOrder(@NotNull OrderMessage orderMsg) {
        OrderMessage msg = OrderMessage.builder()
                .type(orderMsg.getType())
                .size(orderMsg.getSize())
                .build();

        simpMessagingTemplate.convertAndSend(format(
                "%s%s", MQConfig.ORDER_STREAM, orderMsg.getStreamId().toString()), msg);
    }

    @Scheduled(fixedDelay = 1000)
    public void test() {
        distributeNewOrder(new OrderMessage(UUID.fromString("68b0439b-82e7-4aa0-adec-cdd4e877f7ba"), "test", new BigDecimal(10)));
    }
}

它的工作和消息传递,但在控制台,我有一个例外:我如何才能摆脱它的错误?

org.springframework.messaging.MessageDeliveryException: Message broker not active. Consider subscribing to receive BrokerAvailabilityEvent's from an ApplicationListener Spring bean.
    at org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.handleMessageInternal(StompBrokerRelayMessageHandler.java:494) ~[spring-messaging-5.3.8.jar:5.3.8]
    at org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler.handleMessage(AbstractBrokerMessageHandler.java:281) ~[spring-messaging-5.3.8.jar:5.3.8]
    at org.springframework.messaging.support.ExecutorSubscribableChannel$SendTask.run(ExecutorSubscribableChannel.java:144) ~[spring-messaging-5.3.8.jar:5.3.8]
    at org.springframework.messaging.support.ExecutorSubscribableChannel.sendInternal(ExecutorSubscribableChannel.java:100) ~[spring-messaging-5.3.8.jar:5.3.8]
    at org.springframework.messaging.support.AbstractMessageChannel.send(AbstractMessageChannel.java:139) ~[spring-messaging-5.3.8.jar:5.3.8]
    at org.springframework.messaging.support.AbstractMessageChannel.send(AbstractMessageChannel.java:125) ~[spring-messaging-5.3.8.jar:5.3.8]
    at org.springframework.messaging.simp.SimpMessagingTemplate.sendInternal(SimpMessagingTemplate.java:187) ~[spring-messaging-5.3.8.jar:5.3.8]
    at org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:162) ~[spring-messaging-5.3.8.jar:5.3.8]
    at org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:48) ~[spring-messaging-5.3.8.jar:5.3.8]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.3.8.jar:5.3.8]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:151) ~[spring-messaging-5.3.8.jar:5.3.8]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:129) ~[spring-messaging-5.3.8.jar:5.3.8]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:122) ~[spring-messaging-5.3.8.jar:5.3.8]
    at com.tradeshare.service.StreamServiceImpl.distributeNewOrder(StreamServiceImpl.java:32) ~[classes/:na]
    at com.tradeshare.service.StreamServiceImpl.test(StreamServiceImpl.java:38) ~[classes/:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:567) ~[na:na]
    at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84) ~[spring-context-5.3.8.jar:5.3.8]
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.3.8.jar:5.3.8]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.runAndReset$$$capture(FutureTask.java:305) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java) ~[na:na]
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:831) ~[na:na]
2021-07-26 20:17:38.464  INFO 11928 --- [ent-scheduler-3] o.s.m.s.s.StompBrokerRelayMessageHandler : "System" session connected.
2021-07-26 20:17:38.466  INFO 11928 --- [ent-scheduler-3] o.s.m.s.s.StompBrokerRelayMessageHandler : BrokerAvailabilityEvent[available=true, StompBrokerRelay[ReactorNettyTcpClient[reactor.netty.tcp.TcpClientConnect@4156bfbd]]]
2021-07-26 20:17:43.935  WARN 11928 --- [ealth-indicator] o.s.boot.actuate.jms.JmsHealthIndicator  : Connection failed to start within 5 seconds and will be closed.
2021-07-26 20:17:48.944  WARN 11928 --- [)-192.168.0.101] o.s.boot.actuate.jms.JmsHealthIndicator  : JMS health check failed 
javax.jms.JMSException: Wire format negotiation timeout: peer did not send his wire format.
    at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:72) ~[activemq-client-5.16.2.jar:5.16.2]
    at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1421) ~[activemq-client-5.16.2.jar:5.16.2]
    at org.apache.activemq.ActiveMQConnection.ensureConnectionInfoSent(ActiveMQConnection.java:1486) ~[activemq-client-5.16.2.jar:5.16.2]
    at org.apache.activemq.ActiveMQConnection.start(ActiveMQConnection.java:527) ~[activemq-client-5.16.2.jar:5.16.2]
    at org.springframework.jms.connection.SingleConnectionFactory$SharedConnectionInvocationHandler.localStart(SingleConnectionFactory.java:672) ~[spring-jms-5.3.8.jar:5.3.8]
    at org.springframework.jms.connection.SingleConnectionFactory$SharedConnectionInvocationHandler.invoke(SingleConnectionFactory.java:610) ~[spring-jms-5.3.8.jar:5.3.8]
    at jdk.proxy2/jdk.proxy2.$Proxy108.start(Unknown Source) ~[na:na]
    at org.springframework.boot.actuate.jms.JmsHealthIndicator$MonitoredConnection.start(JmsHealthIndicator.java:81) ~[spring-boot-actuator-2.5.2.jar:2.5.2]
    at org.springframework.boot.actuate.jms.JmsHealthIndicator.doHealthCheck(JmsHealthIndicator.java:53) ~[spring-boot-actuator-2.5.2.jar:2.5.2]
    at org.springframework.boot.actuate.health.AbstractHealthIndicator.health(AbstractHealthIndicator.java:82) ~[spring-boot-actuator-2.5.2.jar:2.5.2]
    at org.springframework.boot.actuate.health.HealthIndicator.getHealth(HealthIndicator.java:37) ~[spring-boot-actuator-2.5.2.jar:2.5.2]
    at org.springframework.boot.actuate.health.HealthEndpoint.getHealth(HealthEndpoint.java:71) ~[spring-boot-actuator-2.5.2.jar:2.5.2]
    at org.springframework.boot.actuate.health.HealthEndpoint.getHealth(HealthEndpoint.java:39) ~[spring-boot-actuator-2.5.2.jar:2.5.2]
    at org.springframework.boot.actuate.health.HealthEndpointSupport.getContribution(HealthEndpointSupport.java:99) ~[spring-boot-actuator-2.5.2.jar:2.5.2]
    at org.springframework.boot.actuate.health.HealthEndpointSupport.getAggregateHealth(HealthEndpointSupport.java:110) ~[spring-boot-actuator-2.5.2.jar:2.5.2]
    at org.springframework.boot.actuate.health.HealthEndpointSupport.getContribution(HealthEndpointSupport.java:96) ~[spring-boot-actuator-2.5.2.jar:2.5.2]
    at org.springframework.boot.actuate.health.HealthEndpointSupport.getHealth(HealthEndpointSupport.java:74) ~[spring-boot-actuator-2.5.2.jar:2.5.2]
    at org.springframework.boot.actuate.health.HealthEndpointSupport.getHealth(HealthEndpointSupport.java:61) ~[spring-boot-actuator-2.5.2.jar:2.5.2]
    at org.springframework.boot.actuate.health.HealthEndpoint.health(HealthEndpoint.java:65) ~[spring-boot-actuator-2.5.2.jar:2.5.2]
    at org.springframework.boot.actuate.health.HealthEndpoint.health(HealthEndpoint.java:55) ~[spring-boot-actuator-2.5.2.jar:2.5.2]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:567) ~[na:na]
    at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:282) ~[spring-core-5.3.8.jar:5.3.8]
    at org.springframework.boot.actuate.endpoint.invoke.reflect.ReflectiveOperationInvoker.invoke(ReflectiveOperationInvoker.java:74) ~[spring-boot-actuator-2.5.2.jar:2.5.2]
    at org.springframework.boot.actuate.endpoint.annotation.AbstractDiscoveredOperation.invoke(AbstractDiscoveredOperation.java:60) ~[spring-boot-actuator-2.5.2.jar:2.5.2]
    at org.springframework.boot.actuate.endpoint.jmx.EndpointMBean.invoke(EndpointMBean.java:121) ~[spring-boot-actuator-2.5.2.jar:2.5.2]
    at org.springframework.boot.actuate.endpoint.jmx.EndpointMBean.invoke(EndpointMBean.java:96) ~[spring-boot-actuator-2.5.2.jar:2.5.2]
    at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke(DefaultMBeanServerInterceptor.java:809) ~[na:na]
    at java.management/com.sun.jmx.mbeanserver.JmxMBeanServer.invoke(JmxMBeanServer.java:801) ~[na:na]
    at java.management.rmi/javax.management.remote.rmi.RMIConnectionImpl.doOperation(RMIConnectionImpl.java:1466) ~[na:na]
    at java.management.rmi/javax.management.remote.rmi.RMIConnectionImpl$PrivilegedOperation.run(RMIConnectionImpl.java:1307) ~[na:na]
    at java.management.rmi/javax.management.remote.rmi.RMIConnectionImpl.doPrivilegedOperation(RMIConnectionImpl.java:1399) ~[na:na]
    at java.management.rmi/javax.management.remote.rmi.RMIConnectionImpl.invoke(RMIConnectionImpl.java:827) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:567) ~[na:na]
    at java.rmi/sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:357) ~[na:na]
    at java.rmi/sun.rmi.transport.Transport$1.run(Transport.java:200) ~[na:na]
    at java.rmi/sun.rmi.transport.Transport$1.run(Transport.java:197) ~[na:na]
    at java.base/java.security.AccessController.doPrivileged(AccessController.java:691) ~[na:na]
    at java.rmi/sun.rmi.transport.Transport.serviceCall(Transport.java:196) ~[na:na]
    at java.rmi/sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:587) ~[na:na]
    at java.rmi/sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:828) ~[na:na]
    at java.rmi/sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:705) ~[na:na]
    at java.base/java.security.AccessController.doPrivileged(AccessController.java:391) ~[na:na]
    at java.rmi/sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:704) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:831) ~[na:na]
Caused by: java.io.IOException: Wire format negotiation timeout: peer did not send his wire format.
    at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:99) ~[activemq-client-5.16.2.jar:5.16.2]
    at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68) ~[activemq-client-5.16.2.jar:5.16.2]
    at org.apache.activemq.transport.ResponseCorrelator.asyncRequest(ResponseCorrelator.java:81) ~[activemq-client-5.16.2.jar:5.16.2]
    at org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:86) ~[activemq-client-5.16.2.jar:5.16.2]
    at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1392)> ~[activemq-client-5.16.2.jar:5.16.2]
    ... 50 common frames omitted
2021-07-26 20:18:08.930  INFO 11928 --- [0.1:61613@65232] o.s.j.c.CachingConnectionFactory         : Encountered a JMSException - resetting the underlying JMS Connection
javax.jms.JMSException: Channel was inactive (no connection attempt made) for too (>30000) long: tcp://127.0.0.1:61613
    at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:54) ~[activemq-client-5.16.2.jar:5.16.2]
    at org.apache.activemq.ActiveMQConnection.onAsyncException(ActiveMQConnection.java:1960) ~[activemq-client-5.16.2.jar:5.16.2]
    at org.apache.activemq.ActiveMQConnection.onException(ActiveMQConnection.java:1979) ~[activemq-client-5.16.2.jar:5.16.2]
    at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:114) ~[activemq-client-5.16.2.jar:5.16.2]
    at org.apache.activemq.transport.ResponseCorrelator.onException(ResponseCorrelator.java:126) ~[activemq-client-5.16.2.jar:5.16.2]
    at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:114) ~[activemq-client-5.16.2.jar:5.16.2]
    at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:114) ~[activemq-client-5.16.2.jar:5.16.2]
    at org.apache.activemq.transport.WireFormatNegotiator.onException(WireFormatNegotiator.java:173) ~[activemq-client-5.16.2.jar:5.16.2]
    at org.apache.activemq.transport.AbstractInactivityMonitor.onException(AbstractInactivityMonitor.java:345) ~[activemq-client-5.16.2.jar:5.16.2]
    at org.apache.activemq.transport.AbstractInactivityMonitor$1$1.run(AbstractInactivityMonitor.java:92) ~[activemq-client-5.16.2.jar:5.16.2]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)> ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:831) ~[na:na]
Caused by: org.apache.activemq.transport.InactivityIOException: Channel was inactive (no connection attempt made) for too (>30000) long: tcp://127.0.0.1:61613
    at org.apache.activemq.transport.AbstractInactivityMonitor$1$1.run(AbstractInactivityMonitor.java:93) ~[activemq-client-5.16.2.jar:5.16.2]
    ... 3 common frames omitted

更新1
应用程序属性

spring.activemq.broker-url=tcp://127.0.0.1:61613
spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.packages.trust-all=true
spring.activemq.pool.max-connections=500

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题