Spring 靴+ RabbitMQ:如何使用convertSendAndReceive()方法转换接收到的对象?

9wbgstp7  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(3)|浏览(449)

如何使用convertSendAndReceive()方法反序列化消息?由于无法在另一个包中找到反序列化所需的类,它给我NullPointerException。包在代码中被标记。

监听程序正常接收和发送消息

package org.dneversky.user;

@EnableRabbit
@Component
public class TestListener {

  private static final Logger logger = LoggerFactory.getLogger(TestListener.class);

  @Autowired
  private RabbitTemplate rabbitTemplate;

  @RabbitListener(queues = RabbitMQConfig.RECEIVE_QUEUE)
  public void doGet(UserReplyMessage message) {
    logger.info("Received message: {}", message);
    UserReplyMessage response = new UserReplyMessage();
    logger.info("Sending message: {}", response);
    rabbitTemplate.convertSendAndReceive(RabbitMQConfig.RPC_EXCHANGE, 
RabbitMQConfig.REPLY_QUEUE, response);
  }
}

监听程序的配置

package org.dneversky.user.config;

@Configuration
public class RabbitMQConfig {

  public static final String RECEIVE_QUEUE = "rpc_queue";
  public static final String REPLY_QUEUE = "reply_queue";
  public static final String RPC_EXCHANGE = "rpc_exchange";

  @Bean
  public TopicExchange rpcExchange() {
    return new TopicExchange(RPC_EXCHANGE);
  }

  @Bean
  public Queue receiveQueue() {
    return new Queue(RECEIVE_QUEUE);
  }

  @Bean
  public Queue replyQueue() {
    return new Queue(REPLY_QUEUE);
  }

  @Bean
  public Binding receiveBinding() {
    return BindingBuilder.bind(receiveQueue()).to(rpcExchange()).with(RECEIVE_QUEUE);
  }

  @Bean
  public MessageConverter jsonMessageConverter() {
    return new Jackson2JsonMessageConverter();
  }
}

发送方正常发送消息,但无法反序列化返回消息

package org.dneversky.gateway.servie.impl;

@Service
public class UserServiceImpl {

  private static final Logger logger = LoggerFactory.getLogger(UserServiceImpl.class);

  @Autowired
  private RabbitTemplate rabbitTemplate;

  public UserPrincipal getUserByUsername(String username) {
    UserResponse message = new UserResponse(username);
    logger.info("Sending created message: {}", message);
    UserResponse result = (UserResponse)     rabbitTemplate.convertSendAndReceive(RabbitMQConfig.RPC_EXCHANGE, RabbitMQConfig.RPC_QUEUE, message);
    logger.info("Getting response: {}", result);

    return null;
  }
}

发送方配置

package org.dneversky.gateway.config;

@Configuration
public class RabbitMQConfig {

  public static final String RPC_QUEUE = "rpc_queue";
  public static final String REPLY_QUEUE = "reply_queue";
  public static final String RPC_EXCHANGE = "rpc_exchange";

  @Bean
  public Queue rpcQueue() {
    return new Queue(RPC_QUEUE);
  }

  @Bean
  public Queue replyQueue() {
    return new Queue(REPLY_QUEUE);
  }

  @Bean
  public TopicExchange rpcExchange() {
    return new TopicExchange(RPC_EXCHANGE);
  }

  @Bean
  public Binding binding() {
    return BindingBuilder.bind(replyQueue()).to(rpcExchange()).with(REPLY_QUEUE);
  }

  @Bean
  public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setExchange(RPC_EXCHANGE);
    rabbitTemplate.setReplyAddress(REPLY_QUEUE);
    rabbitTemplate.setReplyTimeout(6000);
    rabbitTemplate.setMessageConverter(messageConverter());

    return rabbitTemplate;
  }

  @Bean
  public MessageConverter messageConverter() {
    return new Jackson2JsonMessageConverter();
  }

  @Bean
  public SimpleMessageListenerContainer replyContainer(ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(REPLY_QUEUE);
    container.setMessageListener(rabbitTemplate(connectionFactory));
    return container;
  }
}

错误日志

2016年12月26日,北京市公安局海淀分局备案(京公网安备110802015466)。路径为[]的上下文中的servlet [dispatcherServlet]的Servlet.service()引发了异常错误[请求处理失败;如果您在运行时遇到一个嵌套的异常,则会出现以下错误:无法解析类名。找不到类[org.dneversky.user.model.UserReplyMessage]],根本原因为

java.lang.ClassNotFoundException: org.dneversky.user.model.UserReplyMessage
agyaoht7

agyaoht71#

默认情况下,生成器将_TypeID_头设置为用于对象序列化的类名
然后使用者使用_TypeID_头来了解将JSON转换为java示例时应使用的类
您使用两个不同的类来序列化和反序列化对象,并且必须配置转换器,

ct3nt3jp

ct3nt3jp2#

在你的replyContainer里面,我看不到你的messageConverter bean。默认情况下,它使用java对象来发送和接收消息,而不将它们转换成人类可读的json。

@Bean
 public SimpleRabbitListenerContainerFactory customListenerContainerFactory(ConnectionFactory connectionFactory,
                                                                               MessageConverter jsonMessageConverter) {
   SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
   factory.setConnectionFactory(connectionFactory);
   factory.setMessageConverter(jsonMessageConverter);
   return factory;
}

为您的消费者;

@RabbitListener(queues = RabbitConstants.YOUR_QUEUE_NAME, containerFactory = "customListenerContainerFactory")
public void onMessage(@Valid YourEvent YourEvent){
 //your code
}
5us2dqdw

5us2dqdw3#

在Listener类中,需要添加以下行来绑定消息转换器

@Bean
public SimpleRabbitListenerContainerFactory jsaFactory(ConnectionFactory connectionFactory,
                              SimpleRabbitListenerContainerFactoryConfigurer configurer) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        factory.setMessageConverter(jsonMessageConverter());
        return factory;
}

此外,在TestListener类中,您应该替换以下行

@RabbitListener(queues = RabbitMQConfig.RECEIVE_QUEUE)

用这个

@RabbitListener(queues = RabbitMQConfig.RECEIVE_QUEUE,containerFactory="jsaFactory")

相关问题