spring SimpleRabbitListenerContainerFactory.setAdviceChain:MethodInterceptor是否可以接受将@RabbitListener参数化的POJO?

c9qzyr3d  于 2022-12-21  发布在  Spring
关注(0)|答案(1)|浏览(194)

版本:

org.springframework.amqp:spring-rabbit:2.4.8
openjdk version "11.0.12" 2021-07-20

可扩展标记语言:

<bean id="myAdvice" class="com.acme.interceptors.TrackNewBarcode"/>
  <bean id="rmqMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter">
    <constructor-arg ref="myObjectMapper"/>
  </bean>
  <bean id="rabbitListenerContainerFactory" class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="concurrentConsumers" value="1"/>
    <property name="maxConcurrentConsumers" value="1"/>
    <property name="receiveTimeout" value="316224000000"/>
    <property name="messageConverter" ref="rmqMessageConverter"/>
    <property name="adviceChain" ref="myAdvice"/>
  </bean>

java :

package com.acme.tasks;

import com.acme.io.NewBarcodes;
import javax.validation.constraints.NotNull;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.validation.annotation.Validated;

@Validated
@Component
public class ConsumeBarcodes
{
  private static final Logger logger = LoggerFactory.getLogger(ConsumeBarcodes.class);

  @Value("${rabbitmq.queue}")
  private String queueName;

  public ConsumeBarcodes() {}

  @RabbitListener(queues = "${rabbitmq.queue}", ackMode = "AUTO")
  public void ingestNewBarcodes(@NotNull NewBarcodes newBarcodes)
  {
    logger.debug("RECEIVED message in ingestNewBarcodes from RabbitMQ queue: {}", queueName);
    // XXX Process newBarcodes here.
  }
}

建议:

package com.acme.interceptors;

...

public class TrackNewBarcode implements MethodInterceptor
{
  ...
  @Override
  public Object invoke(MethodInvocation methodInvocation) throws Throwable
  {
    NewBarcodes newBarcodes = (NewBarcodes)methodInvocation.getArguments()[1];
    ...
  }
}

在我看到的@RabbitListener通知的示例中,在invoke()中,参数#1被强制转换为Message,但在我的@RabbitListener中,我依赖于Jackson消息转换器将入站参数转换为POJO NewBarcodes
问题:在我的建议中,我可以将参数#1转换为POJO NewBarcodes吗?

lnvxswe2

lnvxswe21#

简单的答案是否定的。methodInvocation.getArguments()在第一个参数(索引0)中返回Channel,在第二个参数(索引1)中返回Message。将Message强制转换为NewBarcodes将导致异常。
尽管如此,这里仍然有其他方法来编码建议,我在下面详细说明一种方法。
首先,根据Spring AOP文档,您应该在应用程序配置中启用AOP功能。

@Configuration
@EnableAspectJAutoProxy
public class MyWebAppConfig implements WebMvcConfigurer
{
  ...
}

使用@Aspect注解ConsumeBarcodes,并设置@Pointcut@After通知:

@Aspect
@Validated
@Component
public class ConsumeBarcodes
{
  private static final Logger logger = LoggerFactory.getLogger(ConsumeBarcodes.class);

  @Value("${rabbitmq.queue}")
  private String queueName;

  public ConsumeBarcodes() {}

  @RabbitListener(queues = "${rabbitmq.queue}", ackMode = "AUTO")
  public void ingestNewBarcodes(@NotNull NewBarcodes newBarcodes)
  {
    logger.debug("RECEIVED message in ingestNewBarcodes from RabbitMQ queue: {}", queueName);
    // XXX Process newBarcodes here.
  }

  @Pointcut("execution(com.acme.tasks.ConsumeBarcodes.ingestNewBarcodes(..)")
  public void newBarcodesIn() {}

  @After("newBarcodesIn()")
  public void ackNewBarcodesIn(JoinPoint jp)
  {
    NewBarcodes newBarcodes = (NewBarcodes)jp.getArgs()[0];
    ...
  }
}

可以说,这样做有些过头了,因为我们可以简单地将ackNewBarcodesIn()的逻辑放入ingestNewBarcodes()中,而完全放弃通知,不过,我已经确认了这种设置是有效的,并且@After通知按预期触发。

相关问题