我创建了一个多播队列的订阅者。当启动应用程序时,我可以看到多播地址被创建,下面的客户端队列被创建,如果我使用Web界面发布消息到我的主题订阅者监听的地址。
但是我无法使用测试用例发布到队列。当调用jmsTemplate.convertAndSend
时,测试会导致此错误:Received error from remote peer without description [condition = amqp:invalid-field]
不确定此错误是否是因为它试图创建另一个具有相同客户端ID的连接。我如何创建用于发布的jmsTemplate?(我知道这是从测试用例中测试它,但我的完整应用程序应该侦听消息,增强它并再次将其发送到另一个队列。这只是为了模拟我遇到的错误)
我的应用程序代码在下面或GitHub上:
Springboot应用程序
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jms.annotation.EnableJms;
@SpringBootApplication
@EnableJms
public class MyApplication {
public static void main(final String[] args) {
SpringApplication.run(MyApplication.class, args);
}
}
MyContainerFactory.java
import lombok.Setter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.jms.ConnectionFactory;
@Component
public class MyContainerFactory extends DefaultJmsListenerContainerFactory {
@Autowired
private ConnectionFactory connectionFactory;
@Autowired
private DefaultJmsListenerContainerFactoryConfigurer configurer;
@Setter
private String containerClientId;
@PostConstruct
public void init() {
configurer.configure(this, connectionFactory);
setPubSubDomain(true);
setClientId(containerClientId);
setSubscriptionDurable(true);
}
}
MyListener.java
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class MyListener {
@Value("${addresses.multicast_topic_address}")
private String myTopicAddress;
@JmsListener(destination = "${addresses.multicast_topic_address}",
containerFactory = "myContainerFactory",
subscription = "${addresses.multicast_topic_address}")
public void processMsg(final String message) {
log.info("============= Received: " + message);
}
}
application.yml
server:
port: 9015
spring:
jms:
pub-sub-domain: true
template:
delivery-mode: persistent
qos-enabled: true
listener:
acknowledge-mode: client
main:
web-application-type: none
amqphub:
amqp10jms:
remote-url: amqp://localhost:61616
username: admin
password: admin
clientId: my_topic_client
addresses:
multicast_topic_address: topic_address
我的测试
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jms.core.JmsTemplate;
@Slf4j
@SpringBootTest(classes = MyApplication.class)
public class MyTests {
@Autowired
private JmsTemplate jmsTemplate;
@Value("${addresses.multicast_topic_address}")
private String myTopicAddress;
@Test
void testMe() {
jmsTemplate.convertAndSend(myTopicAddress, "my custom message....");
}
}
完整错误消息:
org.springframework.jms.InvalidClientIDException: Received error from remote peer without description [condition = amqp:invalid-field]; nested exception is javax.jms.InvalidClientIDException: Received error from remote peer without description [condition = amqp:invalid-field]
at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:277)
at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:185)
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:507)
at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:584)
at org.springframework.jms.core.JmsTemplate.convertAndSend(JmsTemplate.java:661)
at nl.ns.inkomend.processor.MyTests.testMe(MyTests.java:23)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:57)
at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: javax.jms.InvalidClientIDException: Received error from remote peer without description [condition = amqp:invalid-field]
at org.apache.qpid.jms.provider.exceptions.ProviderInvalidClientIDException.toJMSException(ProviderInvalidClientIDException.java:35)
at org.apache.qpid.jms.provider.exceptions.ProviderInvalidClientIDException.toJMSException(ProviderInvalidClientIDException.java:21)
at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:80)
at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:112)
at org.apache.qpid.jms.JmsConnection.createResource(JmsConnection.java:698)
at org.apache.qpid.jms.JmsConnection.createResource(JmsConnection.java:682)
at org.apache.qpid.jms.JmsConnection.createJmsConnection(JmsConnection.java:593)
at org.apache.qpid.jms.JmsConnection.connect(JmsConnection.java:180)
at org.apache.qpid.jms.JmsConnectionFactory.createConnection(JmsConnectionFactory.java:213)
at org.apache.qpid.jms.JmsConnectionFactory.createConnection(JmsConnectionFactory.java:200)
at org.springframework.jms.support.JmsAccessor.createConnection(JmsAccessor.java:197)
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:494)
... 72 more
Caused by: org.apache.qpid.jms.provider.exceptions.ProviderInvalidClientIDException: Received error from remote peer without description [condition = amqp:invalid-field]
at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToConnectionClosedException(AmqpSupport.java:136)
at org.apache.qpid.jms.provider.amqp.builders.AmqpConnectionBuilder.getOpenAbortExceptionFromRemote(AmqpConnectionBuilder.java:170)
at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.handleClosed(AmqpResourceBuilder.java:191)
at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.processRemoteClose(AmqpResourceBuilder.java:132)
at org.apache.qpid.jms.provider.amqp.AmqpProvider.processUpdates(AmqpProvider.java:968)
at org.apache.qpid.jms.provider.amqp.AmqpProvider.onData(AmqpProvider.java:878)
at org.apache.qpid.jms.transports.netty.NettyTcpTransport$NettyTcpTransportHandler.channelRead0(NettyTcpTransport.java:563)
at org.apache.qpid.jms.transports.netty.NettyTcpTransport$NettyTcpTransportHandler.channelRead0(NettyTcpTransport.java:556)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.base/java.lang.Thread.run(Thread.java:833)
1条答案
按热度按时间btqmn9zl1#
发现问题。在我的DefaultJmsListenerContainerFactory类中,我有一个
setClientId
操作,它将客户端ID设置为null,因为没有设置变量。将其更改为实际值会为我排序。