spring集成每10秒聚合一次消息

5q4ezhmt  于 2021-07-24  发布在  Java
关注(0)|答案(1)|浏览(394)

在我的一个流中,我每10秒聚合一次,并将这些有效负载写入文件共享。我不太清楚如何使用聚合器。

@Bean
public IntegrationFlow errorHandlingQueueFlow() {
    return IntegrationFlows.from(ERROR_QUEUE_CHANNEL)
            .bridge(e -> e.poller(Pollers.fixedDelay(1000).maxMessagesPerPoll(MAX_MSG_PER_POLL)))                
            .aggregate(a -> a.groupTimeout(10000))// How do i make it collect every 10 seconds.
            .transform(objectToCSVTransformer, "transform")//Converts payload to a CSV
            .handle(smbErrorMessageHandler())// Takes care of writing into Fileshare
            .get();
}

因为这是用于错误处理的,所以只有少数出错的消息会进入这个错误队列通道。所以我想每10秒收集一次,而不是等待一个组的所有消息被接收。当我使用grouptimeout时,每10秒收集的所有消息都会发送到nullchannel。

kyvafyod

kyvafyod1#

默认目的 groupTimeout() 是清理过期的组。如果您想正常地释放它们而不是丢弃它们,您应该考虑使用 sendPartialResultOnExpiry = true . 当然,如果在这些消息中确实有相关详细信息头,那么这一切都是有意义的。否则你需要考虑 correlationStrategy 将这些错误消息分组。
请阅读文档中有关聚合器及其选项的更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator

相关问题