在我的一个流中,我每10秒聚合一次,并将这些有效负载写入文件共享。我不太清楚如何使用聚合器。
@Bean
public IntegrationFlow errorHandlingQueueFlow() {
return IntegrationFlows.from(ERROR_QUEUE_CHANNEL)
.bridge(e -> e.poller(Pollers.fixedDelay(1000).maxMessagesPerPoll(MAX_MSG_PER_POLL)))
.aggregate(a -> a.groupTimeout(10000))// How do i make it collect every 10 seconds.
.transform(objectToCSVTransformer, "transform")//Converts payload to a CSV
.handle(smbErrorMessageHandler())// Takes care of writing into Fileshare
.get();
}
因为这是用于错误处理的,所以只有少数出错的消息会进入这个错误队列通道。所以我想每10秒收集一次,而不是等待一个组的所有消息被接收。当我使用grouptimeout时,每10秒收集的所有消息都会发送到nullchannel。
1条答案
按热度按时间kyvafyod1#
默认目的
groupTimeout()
是清理过期的组。如果您想正常地释放它们而不是丢弃它们,您应该考虑使用sendPartialResultOnExpiry = true
. 当然,如果在这些消息中确实有相关详细信息头,那么这一切都是有意义的。否则你需要考虑correlationStrategy
将这些错误消息分组。请阅读文档中有关聚合器及其选项的更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator