如何在rabbitmq中处理未知/无效的绑定路由键值?

46qrfjad  于 2021-07-06  发布在  Java
关注(0)|答案(2)|浏览(456)

我想知道在exchange中处理具有未知/无效路由键值的邮件的最佳方法是什么?在我的例子中,我在同一个交换中发送所有消息,并且基于路由密钥,消息被路由到相应的队列。以下是我的配置(我使用的是SpringCloudStream):

spring.cloud.stream.bindings.output.destination: my-exchange
spring.cloud.stream.bindings.output.producer.routingKeyExpression: payload.type

spring.cloud.stream.bindings.input-type1.destination: my-exchange # Exchange
spring.cloud.stream.bindings.input-type1.group: input.type1 # Queue 1
spring.cloud.stream.bindings.input-type2.destination: my-exchange # Exchange
spring.cloud.stream.bindings.input-type2.group: input.type2 # Queue 2

spring.cloud.stream.rabbit.bindings.input-type1.consumer.bindingRoutingKey: FOO
spring.cloud.stream.rabbit.bindings.input-type2.consumer.bindingRoutingKey: BAR

现在我要问的是,如果我用 payload.type='ANY' ? 显然,此消息不会被任何使用者检索到,并将保留在exchange中,但是跟踪这些“未知”消息的最佳方法是什么?我可以用dlq吗?
谢谢!

w80xi6nr

w80xi6nr1#

你可以用 mandatory 你的出版商的财产。

When a published message cannot be routed to any queue, 
and the publisher set the mandatory message property to true, the 
message will be returned to it.
The publisher must have a returned message handler set up in order to 
handle the return (e.g. by logging an error or retrying with a different exchange)
wr98u20j

wr98u20j2#

将留在交易所内,
不;交换不“保存”消息,它们只是路由器。
默认情况下,将丢弃无法导出的消息。
您可以将绑定配置为返回不可输出的消息。
请参阅错误通道。
返回是异步的。
在即将发布的3.1版本中,您可以等待将来确定消息是否成功发送。见出版商确认。
如果消息不可输出,则相关数据 returnedMessage 属性已设置。
框架使用 mandatory 另一个答案中提到的特征。
编辑
举个例子:

spring.rabbitmq.publisher-returns: true

spring.cloud.stream.bindings.output.destination: my-exchange
spring.cloud.stream.rabbit.bindings.output.producer.routing-key-expression: headers['rk']

spring.cloud.stream.bindings.output.producer.error-channel-enabled: true
@SpringBootApplication
@EnableBinding(Source.class)
public class So65134452Application {

    public static void main(String[] args) {
        SpringApplication.run(So65134452Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(MessageChannel output) {
        return args -> {
            output.send(MessageBuilder.withPayload("foo")
                    .setHeader("rk", "invalid")
                    .build());
        };
    }

    @Autowired
    RabbitTemplate template;

    @Bean
    public Queue unroutable() {
        return new Queue("unroutable.messages");
    }

    @ServiceActivator(inputChannel = "errorChannel")
    public void error(Message<?> error) {
        if (error.getPayload() instanceof ReturnedAmqpMessageException) {
            this.template.send(unroutable().getName(),
                    ((ReturnedAmqpMessageException) error.getPayload()).getAmqpMessage());
        }
    }

}

相关问题