如何使用AMQP 1.0从使用amqp-10-jms-spring-boot-starter的测试用例发布到红帽AMQ 7主题

mgdq6dx1  于 2023-02-04  发布在  Spring
关注(0)|答案(1)|浏览(242)

我创建了一个多播队列的订阅者。当启动应用程序时,我可以看到多播地址被创建,下面的客户端队列被创建,如果我使用Web界面发布消息到我的主题订阅者监听的地址。

但是我无法使用测试用例发布到队列。当调用jmsTemplate.convertAndSend时,测试会导致此错误:Received error from remote peer without description [condition = amqp:invalid-field]
不确定此错误是否是因为它试图创建另一个具有相同客户端ID的连接。我如何创建用于发布的jmsTemplate?(我知道这是从测试用例中测试它,但我的完整应用程序应该侦听消息,增强它并再次将其发送到另一个队列。这只是为了模拟我遇到的错误)
我的应用程序代码在下面或GitHub上:
Springboot应用程序

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jms.annotation.EnableJms;

@SpringBootApplication
@EnableJms
public class MyApplication {

    public static void main(final String[] args) {
        SpringApplication.run(MyApplication.class, args);
    }

}

MyContainerFactory.java

import lombok.Setter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.jms.ConnectionFactory;

@Component
public class MyContainerFactory extends DefaultJmsListenerContainerFactory {

    @Autowired
    private ConnectionFactory connectionFactory;

    @Autowired
    private DefaultJmsListenerContainerFactoryConfigurer configurer;

    @Setter
    private String containerClientId;

    @PostConstruct
    public void init() {
        configurer.configure(this, connectionFactory);
        setPubSubDomain(true);
        setClientId(containerClientId);
        setSubscriptionDurable(true);
    }
}

MyListener.java

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MyListener {

    @Value("${addresses.multicast_topic_address}")
    private String myTopicAddress;

    @JmsListener(destination = "${addresses.multicast_topic_address}",
            containerFactory = "myContainerFactory",
            subscription = "${addresses.multicast_topic_address}")
    public void processMsg(final String message) {
        log.info("============= Received: " + message);
    }
}

application.yml

server:
  port: 9015
spring:
  jms:
    pub-sub-domain: true
    template:
      delivery-mode: persistent
      qos-enabled: true
    listener:
      acknowledge-mode: client
  main:
    web-application-type: none

amqphub:
  amqp10jms:
    remote-url: amqp://localhost:61616
    username: admin
    password: admin
    clientId: my_topic_client

addresses:
  multicast_topic_address: topic_address

我的测试

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jms.core.JmsTemplate;

@Slf4j
@SpringBootTest(classes = MyApplication.class)
public class MyTests {

    @Autowired
    private JmsTemplate jmsTemplate;

    @Value("${addresses.multicast_topic_address}")
    private String myTopicAddress;

    @Test
    void testMe() {
        jmsTemplate.convertAndSend(myTopicAddress, "my custom message....");
    }
}

完整错误消息:

org.springframework.jms.InvalidClientIDException: Received error from remote peer without description [condition = amqp:invalid-field]; nested exception is javax.jms.InvalidClientIDException: Received error from remote peer without description [condition = amqp:invalid-field]

    at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:277)
    at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:185)
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:507)
    at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:584)
    at org.springframework.jms.core.JmsTemplate.convertAndSend(JmsTemplate.java:661)
    at nl.ns.inkomend.processor.MyTests.testMe(MyTests.java:23)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
    at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
    at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
    at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
    at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
    at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
    at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:57)
    at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
    at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
    at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
    at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
    at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: javax.jms.InvalidClientIDException: Received error from remote peer without description [condition = amqp:invalid-field]
    at org.apache.qpid.jms.provider.exceptions.ProviderInvalidClientIDException.toJMSException(ProviderInvalidClientIDException.java:35)
    at org.apache.qpid.jms.provider.exceptions.ProviderInvalidClientIDException.toJMSException(ProviderInvalidClientIDException.java:21)
    at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:80)
    at org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:112)
    at org.apache.qpid.jms.JmsConnection.createResource(JmsConnection.java:698)
    at org.apache.qpid.jms.JmsConnection.createResource(JmsConnection.java:682)
    at org.apache.qpid.jms.JmsConnection.createJmsConnection(JmsConnection.java:593)
    at org.apache.qpid.jms.JmsConnection.connect(JmsConnection.java:180)
    at org.apache.qpid.jms.JmsConnectionFactory.createConnection(JmsConnectionFactory.java:213)
    at org.apache.qpid.jms.JmsConnectionFactory.createConnection(JmsConnectionFactory.java:200)
    at org.springframework.jms.support.JmsAccessor.createConnection(JmsAccessor.java:197)
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:494)
    ... 72 more
Caused by: org.apache.qpid.jms.provider.exceptions.ProviderInvalidClientIDException: Received error from remote peer without description [condition = amqp:invalid-field]
    at org.apache.qpid.jms.provider.amqp.AmqpSupport.convertToConnectionClosedException(AmqpSupport.java:136)
    at org.apache.qpid.jms.provider.amqp.builders.AmqpConnectionBuilder.getOpenAbortExceptionFromRemote(AmqpConnectionBuilder.java:170)
    at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.handleClosed(AmqpResourceBuilder.java:191)
    at org.apache.qpid.jms.provider.amqp.builders.AmqpResourceBuilder.processRemoteClose(AmqpResourceBuilder.java:132)
    at org.apache.qpid.jms.provider.amqp.AmqpProvider.processUpdates(AmqpProvider.java:968)
    at org.apache.qpid.jms.provider.amqp.AmqpProvider.onData(AmqpProvider.java:878)
    at org.apache.qpid.jms.transports.netty.NettyTcpTransport$NettyTcpTransportHandler.channelRead0(NettyTcpTransport.java:563)
    at org.apache.qpid.jms.transports.netty.NettyTcpTransport$NettyTcpTransportHandler.channelRead0(NettyTcpTransport.java:556)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at java.base/java.lang.Thread.run(Thread.java:833)
btqmn9zl

btqmn9zl1#

发现问题。在我的DefaultJmsListenerContainerFactory类中,我有一个setClientId操作,它将客户端ID设置为null,因为没有设置变量。将其更改为实际值会为我排序。

@Component
public class MyContainerFactory extends DefaultJmsListenerContainerFactory {

    @Autowired
    private ConnectionFactory connectionFactory;

    @Autowired
    private DefaultJmsListenerContainerFactoryConfigurer configurer;

    @PostConstruct
    public void init() {
        configurer.configure(this, connectionFactory);
        setPubSubDomain(true);
        setClientId("my_topic_client");  //This was not set 
        setSubscriptionDurable(true);
    }
}

相关问题