我正在尝试构建集成流,它将防止在传递到amqp代理(rabbitmq)的过程中丢失消息。在broker停止的情况下,我看到一些出乎意料的行为:
失败的邮件正在保存到邮件存储,但保存时间不会太长。这个流不是在等待代理的可用性,它从消息存储中提取消息,即使代理仍然被停止
如果成功重新启动rabbitmq,则消息存储区中的记录(如果仍然存在)不会传递到队列。
请帮我调查一下。代码示例:
@Bean
public MessageChannel messageStoreBackedChannel() {
return new QueueChannel(
new MessageGroupQueue(jdbcChannelMessageStore(), "Group_ID")
);
}
@Bean
public IntegrationFlow someFlow() {
return IntegrationFlows
.from("messageStoreBackedChannel")
.channel("amqpMessageChannel")
.get();
}
@Bean
public IntegrationFlow jmsExtractFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlows
.from("amqpMessageChannel")
.handle(message -> System.out.println(message.getPayload()))
.get();
}
@Bean
public MessageChannel amqpMessageChannel() {
return new PollableAmqpChannel("amqpMessageChannel", amqpTemplate);
}
@Bean
public JdbcChannelMessageStore jdbcChannelMessageStore() {
var jdbcChannelMessageStore = new JdbcChannelMessageStore(dataSource);
jdbcChannelMessageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
return jdbcChannelMessageStore;
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setTrigger(new PeriodicTrigger(10));
return pollerMetadata;
}
1条答案
按热度按时间dy1byipe1#
请考虑在
.from("messageStoreBackedChannel").channel("amqpMessageChannel")
作为transactional()
.像这样:
所以,无论何时
amqpMessageChannel
失败,事务将回滚,失败的消息将返回到存储区,直到下一次轮询。你当然可以阻止
bridge
当连接到rabbitmq时出现错误。但你怎么确定这种联系又回来了呢?。。