Apache Camel聚合完成不工作

sy5wg1nm  于 2022-11-07  发布在  Apache
关注(0)|答案(1)|浏览(185)

我已经配置了一个路由来从交换机中提取一些数据并聚合它们;以下是简单总结:

@Component
@RequiredArgsConstructor
public class FingerprintHistoryRouteBuilder extends RouteBuilder {

    private final FingerprintHistoryService fingerprintHistoryService;

    @Override
    public void configure() throws Exception {
        from("seda:httpFingerprint")
                .aggregate( (AggregationStrategy) (oldExchange, newExchange) -> {
                    final FingerprintHistory newFingerprint = extract(newExchange);
                    if (oldExchange == null) {
                        List<FingerprintHistory> fingerprintHistories = new ArrayList<>();
                        fingerprintHistories.add(newFingerprint);
                        newExchange.getMessage().setBody(fingerprintHistories);
                        return newExchange;
                    }

                    final Message oldMessage = oldExchange.getMessage();
                    final List<FingerprintHistory> fingerprintHistories = (List<FingerprintHistory>) oldMessage.getBody(List.class);
                    fingerprintHistories.add(newFingerprint);

                    return oldExchange;
                })
                .constant(true)
                .completionSize(aggregateCount)
                .completionInterval(aggregateDuration.toMillis())
                .to("direct:processFingerprint")
                .end();

        from("direct:processFingerprint")
                .process(exchange -> {
                    List<FingerprintHistory> fingerprintHistories = exchange.getMessage().getBody(List.class);
                    fingerprintHistoryService.saveAll(fingerprintHistories);
                });
strong text
    }

}

问题是聚合完成永远不起作用,例如,这是我的测试的一个示例:

@SpringBootTest
class FingerprintHistoryRouteBuilderTest {

    @Autowired
    ProducerTemplate producerTemplate;

    @Autowired
    FingerprintHistoryRouteBuilder fingerprintHistoryRouteBuilder;

    @Autowired
    CamelContext camelContext;

    @MockBean
    FingerprintHistoryService historyService;

    @Test
    void api_whenAggregate() {
        UserSearchActivity activity = ActivityFactory.buildSampleSearchActivity("127.0.0.1", "salam", "finger");
        Exchange exchange = buildExchange();
        exchange.getMessage().setBody(activity);

(指纹历史路由构建器,“聚合计数”,1);(指纹历史路由构建器,“聚合持续时间”,纳米的持续时间(1));(指纹历史路由生成器。指纹历史结束点,交换);验证(历史服务)。保存所有(Mockito。任何());}

Exchange buildExchange() {
        DefaultExchange defaultExchange = new DefaultExchange(camelContext);
        defaultExchange.setMessage(new DefaultMessage(camelContext));
        return defaultExchange;
    }

}

结果如下:
通缉但不援引:().saveAll();

6yjfywim

6yjfywim1#

我构建了这个simplified example,并且测试通过了,所以看起来您对聚合的使用可能是正确的。
您是否考虑过Mockito.verify()调用是在交换完成路由之前发生的?您可以通过删除verify调用并向FINGERPRINT_PROCESS_AGGREGATION路由添加.log()语句来对此进行测试。如果您在执行过程中看到日志输出,则您知道交换正在按预期进行路由。如果是这种情况,那么您的verify()调用需要能够等待交换完成路由。我不怎么使用mockito,但是看起来您可以这样做:

Mockito.verify(historyService, timeout(10000)).saveAll(Mockito.any());

相关问题