spring boot应用程序仅在同一主机上使用stomp和activemq,而不使用容器,配置为:
@Configuration
@EnableWebSocketMessageBroker
public class MQConfig implements WebSocketMessageBrokerConfigurer {
public final static String ORDER_STREAM = "/stream/newOrder/";
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry
.addEndpoint("/api/streams")
.setAllowedOrigins("*")
.withSockJS();
}
@Override
public void configureMessageBroker(final MessageBrokerRegistry config) {
config.enableStompBrokerRelay("/stream")
.setRelayHost("localhost")
.setRelayPort(61613)
.setClientLogin(mqClientLogin)
.setClientPasscode(mqClientPasscode);
config.setApplicationDestinationPrefixes("/api");
}
}
和用例:
@Service
@RequiredArgsConstructor
public class StreamServiceImpl implements StreamService {
private final SimpMessagingTemplate simpMessagingTemplate;
@Override
public void distributeNewOrder(@NotNull OrderMessage orderMsg) {
OrderMessage msg = OrderMessage.builder()
.type(orderMsg.getType())
.size(orderMsg.getSize())
.build();
simpMessagingTemplate.convertAndSend(format(
"%s%s", MQConfig.ORDER_STREAM, orderMsg.getStreamId().toString()), msg);
}
@Scheduled(fixedDelay = 1000)
public void test() {
distributeNewOrder(new OrderMessage(UUID.fromString("68b0439b-82e7-4aa0-adec-cdd4e877f7ba"), "test", new BigDecimal(10)));
}
}
它的工作和消息传递,但在控制台,我有一个例外:我如何才能摆脱它的错误?
org.springframework.messaging.MessageDeliveryException: Message broker not active. Consider subscribing to receive BrokerAvailabilityEvent's from an ApplicationListener Spring bean.
at org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.handleMessageInternal(StompBrokerRelayMessageHandler.java:494) ~[spring-messaging-5.3.8.jar:5.3.8]
at org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler.handleMessage(AbstractBrokerMessageHandler.java:281) ~[spring-messaging-5.3.8.jar:5.3.8]
at org.springframework.messaging.support.ExecutorSubscribableChannel$SendTask.run(ExecutorSubscribableChannel.java:144) ~[spring-messaging-5.3.8.jar:5.3.8]
at org.springframework.messaging.support.ExecutorSubscribableChannel.sendInternal(ExecutorSubscribableChannel.java:100) ~[spring-messaging-5.3.8.jar:5.3.8]
at org.springframework.messaging.support.AbstractMessageChannel.send(AbstractMessageChannel.java:139) ~[spring-messaging-5.3.8.jar:5.3.8]
at org.springframework.messaging.support.AbstractMessageChannel.send(AbstractMessageChannel.java:125) ~[spring-messaging-5.3.8.jar:5.3.8]
at org.springframework.messaging.simp.SimpMessagingTemplate.sendInternal(SimpMessagingTemplate.java:187) ~[spring-messaging-5.3.8.jar:5.3.8]
at org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:162) ~[spring-messaging-5.3.8.jar:5.3.8]
at org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:48) ~[spring-messaging-5.3.8.jar:5.3.8]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.3.8.jar:5.3.8]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:151) ~[spring-messaging-5.3.8.jar:5.3.8]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:129) ~[spring-messaging-5.3.8.jar:5.3.8]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:122) ~[spring-messaging-5.3.8.jar:5.3.8]
at com.tradeshare.service.StreamServiceImpl.distributeNewOrder(StreamServiceImpl.java:32) ~[classes/:na]
at com.tradeshare.service.StreamServiceImpl.test(StreamServiceImpl.java:38) ~[classes/:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:567) ~[na:na]
at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84) ~[spring-context-5.3.8.jar:5.3.8]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.3.8.jar:5.3.8]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.runAndReset$$$capture(FutureTask.java:305) ~[na:na]
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:831) ~[na:na]
2021-07-26 20:17:38.464 INFO 11928 --- [ent-scheduler-3] o.s.m.s.s.StompBrokerRelayMessageHandler : "System" session connected.
2021-07-26 20:17:38.466 INFO 11928 --- [ent-scheduler-3] o.s.m.s.s.StompBrokerRelayMessageHandler : BrokerAvailabilityEvent[available=true, StompBrokerRelay[ReactorNettyTcpClient[reactor.netty.tcp.TcpClientConnect@4156bfbd]]]
2021-07-26 20:17:43.935 WARN 11928 --- [ealth-indicator] o.s.boot.actuate.jms.JmsHealthIndicator : Connection failed to start within 5 seconds and will be closed.
2021-07-26 20:17:48.944 WARN 11928 --- [)-192.168.0.101] o.s.boot.actuate.jms.JmsHealthIndicator : JMS health check failed
javax.jms.JMSException: Wire format negotiation timeout: peer did not send his wire format.
at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:72) ~[activemq-client-5.16.2.jar:5.16.2]
at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1421) ~[activemq-client-5.16.2.jar:5.16.2]
at org.apache.activemq.ActiveMQConnection.ensureConnectionInfoSent(ActiveMQConnection.java:1486) ~[activemq-client-5.16.2.jar:5.16.2]
at org.apache.activemq.ActiveMQConnection.start(ActiveMQConnection.java:527) ~[activemq-client-5.16.2.jar:5.16.2]
at org.springframework.jms.connection.SingleConnectionFactory$SharedConnectionInvocationHandler.localStart(SingleConnectionFactory.java:672) ~[spring-jms-5.3.8.jar:5.3.8]
at org.springframework.jms.connection.SingleConnectionFactory$SharedConnectionInvocationHandler.invoke(SingleConnectionFactory.java:610) ~[spring-jms-5.3.8.jar:5.3.8]
at jdk.proxy2/jdk.proxy2.$Proxy108.start(Unknown Source) ~[na:na]
at org.springframework.boot.actuate.jms.JmsHealthIndicator$MonitoredConnection.start(JmsHealthIndicator.java:81) ~[spring-boot-actuator-2.5.2.jar:2.5.2]
at org.springframework.boot.actuate.jms.JmsHealthIndicator.doHealthCheck(JmsHealthIndicator.java:53) ~[spring-boot-actuator-2.5.2.jar:2.5.2]
at org.springframework.boot.actuate.health.AbstractHealthIndicator.health(AbstractHealthIndicator.java:82) ~[spring-boot-actuator-2.5.2.jar:2.5.2]
at org.springframework.boot.actuate.health.HealthIndicator.getHealth(HealthIndicator.java:37) ~[spring-boot-actuator-2.5.2.jar:2.5.2]
at org.springframework.boot.actuate.health.HealthEndpoint.getHealth(HealthEndpoint.java:71) ~[spring-boot-actuator-2.5.2.jar:2.5.2]
at org.springframework.boot.actuate.health.HealthEndpoint.getHealth(HealthEndpoint.java:39) ~[spring-boot-actuator-2.5.2.jar:2.5.2]
at org.springframework.boot.actuate.health.HealthEndpointSupport.getContribution(HealthEndpointSupport.java:99) ~[spring-boot-actuator-2.5.2.jar:2.5.2]
at org.springframework.boot.actuate.health.HealthEndpointSupport.getAggregateHealth(HealthEndpointSupport.java:110) ~[spring-boot-actuator-2.5.2.jar:2.5.2]
at org.springframework.boot.actuate.health.HealthEndpointSupport.getContribution(HealthEndpointSupport.java:96) ~[spring-boot-actuator-2.5.2.jar:2.5.2]
at org.springframework.boot.actuate.health.HealthEndpointSupport.getHealth(HealthEndpointSupport.java:74) ~[spring-boot-actuator-2.5.2.jar:2.5.2]
at org.springframework.boot.actuate.health.HealthEndpointSupport.getHealth(HealthEndpointSupport.java:61) ~[spring-boot-actuator-2.5.2.jar:2.5.2]
at org.springframework.boot.actuate.health.HealthEndpoint.health(HealthEndpoint.java:65) ~[spring-boot-actuator-2.5.2.jar:2.5.2]
at org.springframework.boot.actuate.health.HealthEndpoint.health(HealthEndpoint.java:55) ~[spring-boot-actuator-2.5.2.jar:2.5.2]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:567) ~[na:na]
at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:282) ~[spring-core-5.3.8.jar:5.3.8]
at org.springframework.boot.actuate.endpoint.invoke.reflect.ReflectiveOperationInvoker.invoke(ReflectiveOperationInvoker.java:74) ~[spring-boot-actuator-2.5.2.jar:2.5.2]
at org.springframework.boot.actuate.endpoint.annotation.AbstractDiscoveredOperation.invoke(AbstractDiscoveredOperation.java:60) ~[spring-boot-actuator-2.5.2.jar:2.5.2]
at org.springframework.boot.actuate.endpoint.jmx.EndpointMBean.invoke(EndpointMBean.java:121) ~[spring-boot-actuator-2.5.2.jar:2.5.2]
at org.springframework.boot.actuate.endpoint.jmx.EndpointMBean.invoke(EndpointMBean.java:96) ~[spring-boot-actuator-2.5.2.jar:2.5.2]
at java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke(DefaultMBeanServerInterceptor.java:809) ~[na:na]
at java.management/com.sun.jmx.mbeanserver.JmxMBeanServer.invoke(JmxMBeanServer.java:801) ~[na:na]
at java.management.rmi/javax.management.remote.rmi.RMIConnectionImpl.doOperation(RMIConnectionImpl.java:1466) ~[na:na]
at java.management.rmi/javax.management.remote.rmi.RMIConnectionImpl$PrivilegedOperation.run(RMIConnectionImpl.java:1307) ~[na:na]
at java.management.rmi/javax.management.remote.rmi.RMIConnectionImpl.doPrivilegedOperation(RMIConnectionImpl.java:1399) ~[na:na]
at java.management.rmi/javax.management.remote.rmi.RMIConnectionImpl.invoke(RMIConnectionImpl.java:827) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:78) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:567) ~[na:na]
at java.rmi/sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:357) ~[na:na]
at java.rmi/sun.rmi.transport.Transport$1.run(Transport.java:200) ~[na:na]
at java.rmi/sun.rmi.transport.Transport$1.run(Transport.java:197) ~[na:na]
at java.base/java.security.AccessController.doPrivileged(AccessController.java:691) ~[na:na]
at java.rmi/sun.rmi.transport.Transport.serviceCall(Transport.java:196) ~[na:na]
at java.rmi/sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:587) ~[na:na]
at java.rmi/sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:828) ~[na:na]
at java.rmi/sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:705) ~[na:na]
at java.base/java.security.AccessController.doPrivileged(AccessController.java:391) ~[na:na]
at java.rmi/sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:704) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:831) ~[na:na]
Caused by: java.io.IOException: Wire format negotiation timeout: peer did not send his wire format.
at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:99) ~[activemq-client-5.16.2.jar:5.16.2]
at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68) ~[activemq-client-5.16.2.jar:5.16.2]
at org.apache.activemq.transport.ResponseCorrelator.asyncRequest(ResponseCorrelator.java:81) ~[activemq-client-5.16.2.jar:5.16.2]
at org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:86) ~[activemq-client-5.16.2.jar:5.16.2]
at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1392)> ~[activemq-client-5.16.2.jar:5.16.2]
... 50 common frames omitted
2021-07-26 20:18:08.930 INFO 11928 --- [0.1:61613@65232] o.s.j.c.CachingConnectionFactory : Encountered a JMSException - resetting the underlying JMS Connection
javax.jms.JMSException: Channel was inactive (no connection attempt made) for too (>30000) long: tcp://127.0.0.1:61613
at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:54) ~[activemq-client-5.16.2.jar:5.16.2]
at org.apache.activemq.ActiveMQConnection.onAsyncException(ActiveMQConnection.java:1960) ~[activemq-client-5.16.2.jar:5.16.2]
at org.apache.activemq.ActiveMQConnection.onException(ActiveMQConnection.java:1979) ~[activemq-client-5.16.2.jar:5.16.2]
at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:114) ~[activemq-client-5.16.2.jar:5.16.2]
at org.apache.activemq.transport.ResponseCorrelator.onException(ResponseCorrelator.java:126) ~[activemq-client-5.16.2.jar:5.16.2]
at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:114) ~[activemq-client-5.16.2.jar:5.16.2]
at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:114) ~[activemq-client-5.16.2.jar:5.16.2]
at org.apache.activemq.transport.WireFormatNegotiator.onException(WireFormatNegotiator.java:173) ~[activemq-client-5.16.2.jar:5.16.2]
at org.apache.activemq.transport.AbstractInactivityMonitor.onException(AbstractInactivityMonitor.java:345) ~[activemq-client-5.16.2.jar:5.16.2]
at org.apache.activemq.transport.AbstractInactivityMonitor$1$1.run(AbstractInactivityMonitor.java:92) ~[activemq-client-5.16.2.jar:5.16.2]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)> ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:831) ~[na:na]
Caused by: org.apache.activemq.transport.InactivityIOException: Channel was inactive (no connection attempt made) for too (>30000) long: tcp://127.0.0.1:61613
at org.apache.activemq.transport.AbstractInactivityMonitor$1$1.run(AbstractInactivityMonitor.java:93) ~[activemq-client-5.16.2.jar:5.16.2]
... 3 common frames omitted
更新1
应用程序属性
spring.activemq.broker-url=tcp://127.0.0.1:61613
spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.packages.trust-all=true
spring.activemq.pool.max-connections=500
暂无答案!
目前还没有任何答案,快来回答吧!