我有一个在生产环境中运行的应用程序,它使用activemq上的一个虚拟主题
@JmsListener(destination = "Consumer.example-consumer.VirtualTopic.${spring.activemq.example.topic}")
public void consumeTopic(String message) {...}
application.properties中的位置
spring.activemq.example.topic=example.topic
另外,我在spring boot-like中配置了活动mq
@Configuration
@EnableJms
public class AmazonMQConfiguration {
@Value("${spring.activemq.broker-url}")
public String brokerURL;
@Value("${spring.activemq.user}")
public String userName;
@Value("${spring.activemq.password}")
public String password;
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL(brokerURL);
factory.setUserName(userName);
factory.setPassword(password);
return factory;
}
@Bean
public JmsTemplate jmsTemplate(){
return new JmsTemplate(activeMQConnectionFactory());
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(activeMQConnectionFactory());
factory.setConcurrency("1-1");
return factory;
}
}
但昨天我突然 @JmsListener
停止使用来自虚拟主题的消息,在24小时内,我的使用者队列中有大约82000个挂起的消息。
请帮助我的问题,因为我不知道为什么这个问题发生,无法重新创建它。
下面我将进一步提供我的dependencies pom.xml代码和示例线程转储,当我了解到这个问题时,我使用了这个示例线程转储。
波姆。xml:-
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jersey</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web-services</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- Exclude Spring Boot's Default Logging -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Add Log4j2 Dependency -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
线程dump:-
2020-12-16 15:02:17
Full thread dump OpenJDK 64-Bit Server VM (25.265-b01 mixed mode):
"Attach Listener" #723 daemon prio=9 os_prio=0 tid=0x00007f77a8007800 nid=0x33b6 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"ActiveMQ InactivityMonitor Worker" #55 daemon prio=5 os_prio=0 tid=0x00007f77a00bc000 nid=0x6571 waiting on condition [0x00007f7782ffd000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000855ef0a8> (a java.util.concurrent.SynchronousQueue$TransferStack)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
at java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:941)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1073)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
"reactor-http-epoll-4" #54 daemon prio=5 os_prio=0 tid=0x00007f77b84fb000 nid=0x655c runnable [0x00007f778c5af000]
java.lang.Thread.State: RUNNABLE
at io.netty.channel.epoll.Native.epollWait(Native Method)
at io.netty.channel.epoll.Native.epollWait(Native.java:148)
at io.netty.channel.epoll.Native.epollWait(Native.java:141)
at io.netty.channel.epoll.EpollEventLoop.epollWaitNoTimerChange(EpollEventLoop.java:290)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:347)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
"reactor-http-epoll-3" #53 daemon prio=5 os_prio=0 tid=0x00007f77b84f9800 nid=0x655b runnable [0x00007f778c6b0000]
java.lang.Thread.State: RUNNABLE
at io.netty.channel.epoll.Native.epollWait(Native Method)
at io.netty.channel.epoll.Native.epollWait(Native.java:148)
at io.netty.channel.epoll.Native.epollWait(Native.java:141)
at io.netty.channel.epoll.EpollEventLoop.epollWaitNoTimerChange(EpollEventLoop.java:290)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:347)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
"reactor-http-epoll-2" #52 daemon prio=5 os_prio=0 tid=0x00007f77b84f7800 nid=0x655a runnable [0x00007f778c7b1000]
java.lang.Thread.State: RUNNABLE
at io.netty.channel.epoll.Native.epollWait(Native Method)
at io.netty.channel.epoll.Native.epollWait(Native.java:148)
at io.netty.channel.epoll.Native.epollWait(Native.java:141)
at io.netty.channel.epoll.EpollEventLoop.epollWaitNoTimerChange(EpollEventLoop.java:290)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:347)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
"reactor-http-epoll-1" #51 daemon prio=5 os_prio=0 tid=0x00007f77b84f6800 nid=0x6559 runnable [0x00007f778c8b2000]
java.lang.Thread.State: RUNNABLE
at io.netty.channel.epoll.Native.epollWait(Native Method)
at io.netty.channel.epoll.Native.epollWait(Native.java:148)
at io.netty.channel.epoll.Native.epollWait(Native.java:141)
at io.netty.channel.epoll.EpollEventLoop.epollWaitNoTimerChange(EpollEventLoop.java:290)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:347)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
"DestroyJavaVM" #50 prio=5 os_prio=0 tid=0x00007f77dc04c800 nid=0x651e waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"ActiveMQ Transport: ssl://b-d15b4567-aw8b-4k80-8jd9-343hht45ab97-2.mq.ap-south-1.amazonaws.com/172.158.11.158:61617" #44 prio=5 os_prio=0 tid=0x00007f77a8009800 nid=0x6554 runnable [0x00007f778d2c6000]
java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
at sun.security.ssl.InputRecord.read(InputRecord.java:503)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:990)
- locked <0x00000000855731a0> (a java.lang.Object)
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:948)
at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
- locked <0x00000000855c6688> (a sun.security.ssl.AppInputStream)
at org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50)
at org.apache.activemq.transport.tcp.TcpTransport$2.fill(TcpTransport.java:634)
at org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:59)
at org.apache.activemq.transport.tcp.TcpTransport$2.read(TcpTransport.java:619)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268)
at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
at java.lang.Thread.run(Thread.java:748)
"DefaultMessageListenerContainer-1" #42 prio=5 os_prio=0 tid=0x00007f77dd72c800 nid=0x6552 waiting on condition [0x00007f778d4c8000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x000000008a8e04d0> (a java.util.concurrent.CountDownLatch$Sync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:87)
at reactor.core.publisher.Mono.block(Mono.java:1680)
at com.main.service.SingularConnectionService.sendDataToSingular(SingularConnectionService.java:38)
at com.main.consumer.AmazonMQConsumer.consumeTopic(AmazonMQConsumer.java:62)
at sun.reflect.GeneratedMethodAccessor34.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:114)
at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:77)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736)
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1179)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076)
at java.lang.Thread.run(Thread.java:748)
"ActiveMQ InactivityMonitor WriteCheckTimer" #40 daemon prio=5 os_prio=0 tid=0x00007f7798038000 nid=0x6550 in Object.wait() [0x00007f778d6ca000]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.util.TimerThread.mainLoop(Timer.java:552)
- locked <0x000000008556c420> (a java.util.TaskQueue)
at java.util.TimerThread.run(Timer.java:505)
"ActiveMQ Transport: ssl://b-d15b4567-aw8b-4k80-8jd9-343hht45ab97-2.mq.ap-south-1.amazonaws.com/172.158.11.158:61617" #38 prio=5 os_prio=0 tid=0x00007f7794571800 nid=0x654e runnable [0x00007f778d8cc000]
java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
at sun.security.ssl.InputRecord.read(InputRecord.java:503)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:990)
- locked <0x00000000855d9288> (a java.lang.Object)
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:948)
at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
- locked <0x00000000855da5e8> (a sun.security.ssl.AppInputStream)
at org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50)
at org.apache.activemq.transport.tcp.TcpTransport$2.fill(TcpTransport.java:634)
at org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:59)
at org.apache.activemq.transport.tcp.TcpTransport$2.read(TcpTransport.java:619)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268)
at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
at java.lang.Thread.run(Thread.java:748)
"ActiveMQ InactivityMonitor ReadCheckTimer" #37 daemon prio=5 os_prio=0 tid=0x00007f7794210800 nid=0x654d in Object.wait() [0x00007f778d9cd000]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.util.TimerThread.mainLoop(Timer.java:552)
- locked <0x00000000855eebe0> (a java.util.TaskQueue)
at java.util.TimerThread.run(Timer.java:505)
"DefaultMessageListenerContainer-1" #36 prio=5 os_prio=0 tid=0x00007f77dd743000 nid=0x654c in Object.wait() [0x00007f778dcce000]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at org.apache.activemq.FifoMessageDispatchChannel.dequeue(FifoMessageDispatchChannel.java:74)
- locked <0x00000000855eac88> (a java.lang.Object)
at org.apache.activemq.ActiveMQMessageConsumer.dequeue(ActiveMQMessageConsumer.java:486)
at org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:653)
at org.springframework.jms.support.destination.JmsDestinationAccessor.receiveFromConsumer(JmsDestinationAccessor.java:132)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:418)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:303)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1179)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076)
at java.lang.Thread.run(Thread.java:748)
"http-nio-8082-Acceptor" #34 daemon prio=5 os_prio=0 tid=0x00007f77dd827800 nid=0x654a runnable [0x00007f778ded0000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:419)
at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:247)
- locked <0x00000000853bcca0> (a java.lang.Object)
at org.apache.tomcat.util.net.NioEndpoint.serverSocketAccept(NioEndpoint.java:469)
at org.apache.tomcat.util.net.NioEndpoint.serverSocketAccept(NioEndpoint.java:71)
at org.apache.tomcat.util.net.Acceptor.run(Acceptor.java:106)
at java.lang.Thread.run(Thread.java:748)
1条答案
按热度按时间e5nszbig1#
单线程转储没有多大帮助,因为它只提供应用程序正在执行的操作的快照。您真正需要的是一系列线程转储,这样您就可以看到线程随着时间的推移在做什么。但是,由于您只提供了一个单线程转储,我想说您的问题是这个线程:
这是实际使用来自主题的消息的线程。请注意,在该方法中它被阻止:
目前还不清楚这种阻塞会持续多久,但有可能这种阻塞永远不会清除,基本上会阻止您的消费者接收更多的消息。既然你用的是
setConcurrency("1-1")
您只有一个消费者,因此如果它被阻止,则不会再消费更多的消息。您应该调查这个方法在做什么,并确保您调用任何带有超时的阻塞操作,这样线程就不能无限期地被阻塞。