我的项目需要一个先拆分然后聚合的操作,而不是集成的拆分+聚合,但是我不知道如何传播异常以停止进一步的处理。
在下面的示例中,如果在聚合后抛出异常,我将需要能够不生成最后一个日志。
@RunWith(SpringJUnit4ClassRunner.class)
public class AggregationExceptionTest extends CamelTestSupport {
private final Logger LOGGER = LoggerFactory.getLogger(AggregationExceptionTest.class);
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:start")
.split(body()).streaming().stopOnException().parallelProcessing()
.aggregate(new AggregationStrategy() {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (newExchange.getException() != null) {
LOGGER.info("exception propagated");
}
return oldExchange==null?newExchange:oldExchange;
}
}).constant(true)
.completionSize(1).completionTimeout(500)
.log(LoggingLevel.INFO, LOGGER, "Aggreg ${body}")
.throwException(Exception.class, "propagate plz")
.end()
.end()
.process(e -> {
LOGGER.info("I don't want to be seen, because of {}", e.getException());
});
}
};
}
@Test
public void test1() throws InterruptedException {
template.sendBody("direct:start", Arrays.asList("A", "B", "C", "D"));
Thread.sleep(5000);
}
}
抛出的异常在 aggregate 方法中永远不可见。
1条答案
按热度按时间p1iqtdky1#
我不确定这是否可行。聚合器的输出与调用它的路由完全分离。它也在自己的线程中运行,除非你给它提供一个
SynchronousExecutorService
示例。您可以尝试使用
SynchronousExecutorService
选项:在拆分之前,在头文件中设置一个名为
exception
的键的Map
示例,初始值为null。然后在聚合器输出中,如果发生异常,用异常更新Map键。然后在拆分中,您可以检查异常示例的Map键,并执行任何您想要的操作,例如,拆分器的stopOnException
选项。