Camel先拆分再聚合异常传播

o7jaxewo  于 2022-11-07  发布在  Apache
关注(0)|答案(1)|浏览(164)

我的项目需要一个先拆分然后聚合的操作,而不是集成的拆分+聚合,但是我不知道如何传播异常以停止进一步的处理。
在下面的示例中,如果在聚合后抛出异常,我将需要能够不生成最后一个日志。

@RunWith(SpringJUnit4ClassRunner.class)
public class AggregationExceptionTest extends CamelTestSupport {
private final Logger LOGGER = LoggerFactory.getLogger(AggregationExceptionTest.class);

    @Override
    protected RouteBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from("direct:start")    
                    .split(body()).streaming().stopOnException().parallelProcessing()
                        .aggregate(new AggregationStrategy() {
                            @Override
                            public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                                if (newExchange.getException() != null) {
                                    LOGGER.info("exception propagated");
                                }
                               return oldExchange==null?newExchange:oldExchange;
                            }
                        }).constant(true)
                        .completionSize(1).completionTimeout(500)
                            .log(LoggingLevel.INFO, LOGGER, "Aggreg ${body}")
                            .throwException(Exception.class, "propagate plz")
                        .end()
                    .end()
                    .process(e -> {
                        LOGGER.info("I don't want to be seen, because of {}", e.getException());
                    });
            }
        };
    }

    @Test
    public void test1() throws InterruptedException {
        template.sendBody("direct:start", Arrays.asList("A", "B", "C", "D"));

        Thread.sleep(5000);
    }
}

抛出的异常在 aggregate 方法中永远不可见。

p1iqtdky

p1iqtdky1#

我不确定这是否可行。聚合器的输出与调用它的路由完全分离。它也在自己的线程中运行,除非你给它提供一个SynchronousExecutorService示例。
您可以尝试使用SynchronousExecutorService选项:

.aggregate(constant(1), new YourAggregator())
    .executorService(new SynchronousExecutorService())

在拆分之前,在头文件中设置一个名为exception的键的Map示例,初始值为null。然后在聚合器输出中,如果发生异常,用异常更新Map键。然后在拆分中,您可以检查异常示例的Map键,并执行任何您想要的操作,例如,拆分器的stopOnException选项。

相关问题