Camel -处理Split内部的异常并继续

luaexgnf  于 2022-11-07  发布在  Apache
关注(0)|答案(2)|浏览(157)

在我的 Camel 路线中,我有一个简单的分叉,就像这样

.split(body(), eventListAggregationStrategy).parallelProcessing()
                    .process(rawEventTransformationProcessor)
                .end()

如果.process(rawEventTransformationProcessor)内部发生异常,我希望处理它们并将其发送到错误队列。但是,我所有的尝试都失败了,整个路由停止。我尝试了多种方法(handled、continue、shareUnitOfWork)等onException。

onException(RawEventTransformationException.class)
                .to("log:RawEventTransformationException?showAll=true&multiline=true&level=ERROR")
                .handled(true)
                .process(exchange -> {
                    RawEventTransformationException cause = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, RawEventTransformationException.class);
                    exchange.getIn().setBody(cause.getFailedEvent());
                })
                .marshal().json(JsonLibrary.Jackson, RawEvent.class)
                .to("rabbitmq://errors?queue=transformed-data-fails&routingKey=transformed-data-fails&autoDelete=false");

如果我有一个额外的direct路由,当一个异常发生时,我会从处理器内部触发这个路由,那么什么会起作用呢?

catch (RawEventTransformationException e1) {
            producerTemplate.sendBody(e1);
            exchange.getIn().setBody(List.of());
            throw e1;
        }

捕获此异常并让其余异常继续执行的最佳实践方法是什么?

rur96b6h

rur96b6h1#

使用continued(true)的路由级异常处理至少对我来说是有效的。所有的消息拆分消息都得到了处理和异常处理。请查看Dead Letter channel以了解有关将失败的交换发送到错误队列的信息。
您还可以在拆分块内调用direct route,并在其中使用路由级异常处理,以将异常处理限制为仅拆分消息。

public class ExampleTest extends CamelTestSupport {

    @Test
    public void exampleTest() {

        List<String> body = Arrays.asList(new String[]{"a", "b", "exception", "c", "d"});
        template.sendBody("direct:start", body);
    }

    @Override
    protected RoutesBuilder createRouteBuilder() throws Exception {

        return new RouteBuilder() {

            @Override
            public void configure() throws Exception {

                from("direct:start")
                    .onException(Exception.class)
                        .log(LoggingLevel.ERROR, "Exception handled: ${exception.message}")
                        .continued(true)
                    .end()
                    .log(LoggingLevel.INFO, "Start")
                    .split(body()).parallelProcessing()
                        .choice()
                            .when(body().isNotEqualTo("exception"))
                                .log(LoggingLevel.INFO, "${body}")
                            .otherwise()
                                .process(exchange -> {
                                    throw new Exception("Example exception");
                                })
                        .end()
                    .end()
                    .log(LoggingLevel.INFO, "Done");
            }
        };
    }
}

记录档:

INFO  [log]         org.apache.camel.spi.CamelLogger : Start
INFO  [log]         org.apache.camel.spi.CamelLogger : b
INFO  [log]         org.apache.camel.spi.CamelLogger : a
INFO  [log]         org.apache.camel.spi.CamelLogger : c
INFO  [log]         org.apache.camel.spi.CamelLogger : d
ERROR [log]         org.apache.camel.spi.CamelLogger : Exception handled: Example exception
INFO  [log]         org.apache.camel.spi.CamelLogger : Done
mrwjdhj3

mrwjdhj32#

您可以让EventListAggregationStrategy清除它返回的交换上的异常:

@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
    ...
    newExchange.setException(null);
    return newExchange;
}

相关问题