如何调查Apache Camel 之路上的数据?

o2rvlv0m  于 2022-11-07  发布在  Apache
关注(0)|答案(1)|浏览(166)

我的项目致力于从一个系统到另一个系统的数据传输。我们使用Apache Camel Routes在JBoss EAP v7服务器之间发送数据。我的问题是,当数据包经过不同的路由时,是否有办法调查数据包的内容?
我们已经尝试过增加日志记录,但我们的文件/控制台只是被淹没。我们还尝试过在服务器上使用Hawtio来查看通过路由传递的消息,但没有成功识别我们的消息在哪里被“卡住”。
任何帮助都是感激不尽的!

x8goxv8g

x8goxv8g1#

您可以使用单元测试在本地测试路由,然后使用adviceWith和weave方法在特定点记录交换的内容。
通过单元测试,您可以在您最喜欢的IDE中轻松地调试您的路由,即使您在Karaf或Red Hat fuse中运行camel。

package com.example;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.AdviceWithRouteBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.model.dataformat.JsonLibrary;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;

public class ExampleRouteTests extends CamelTestSupport {

    @Test
    public void exampleTest() throws Exception 
    {
        ContractDetails testDetails = new ContractDetails(1512, 1215);
        mockJDBCEndpoints();

        context.getRouteDefinition("exampleRoute")
            .adviceWith(context, new AdviceWithRouteBuilder(){

            @Override
            public void configure() throws Exception {

                replaceFromWith("direct:start");

                weaveByToUri("direct:getDetailsFromAPI")
                    .replace()
                    .to("log:testLogger?showAll=true")
                    .to("mock:api")
                    .setBody(constant(testDetails));

                weaveByToUri("direct:saveToDatabase")
                    .replace()
                    .to("log:testLogger?showAll=true")
                    .to("mock:db");
            }
        });

        MockEndpoint apiMockEndpoint = getMockEndpoint("mock:api");
        apiMockEndpoint.expectedMessageCount(1);

        MockEndpoint dbMockEndpoint = getMockEndpoint("mock:db");
        dbMockEndpoint.expectedMessageCount(1);

        context.start();

        String body = "{\"name\":\"Bob\",\"age\":10}";
        template.sendBody("direct:start", body);

        apiMockEndpoint.assertIsSatisfied();
        dbMockEndpoint.assertIsSatisfied();
    }  

    @Override
    protected RoutesBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder(){

            @Override
            public void configure() throws Exception {

                from("amqp:queue:example")
                    .routeId("exampleRoute")
                    .unmarshal().json(JsonLibrary.Jackson, 
                        Person.class)
                    .to("direct:getDetailsFromAPI")
                    .process(new SomeProcessor())
                    .to("direct:saveToDatabase");

                from("direct:saveToDatabase")
                    .routeId("saveToDatabaseRoute")
                    .to("velocity:sql/insertQueryTemplate.vt")
                    .to("jdbc:exampleDatabase");

                from("direct:getDetailsFromAPI")
                    .removeHeaders("*")
                    .toD("http4:someAPI?name=${body.getName()}")
                    .unmarshal().json(JsonLibrary.Jackson, 
                        ContractDetails.class);
            }
        };
    }

    void mockJDBCEndpoints() throws Exception {

        context.getRouteDefinition("saveToDatabaseRoute")
            .adviceWith(context, new AdviceWithRouteBuilder(){

                @Override
                public void configure() throws Exception {
                    weaveByToUri("jdbc:*")
                        .replace()
                        .to("mock:db");
                }
        });
    }

    @Override
    public boolean isUseAdviceWith() {
        return true;
    }
}

现在,为了解决单元测试中不会出现的问题,您可以使用onException配置一般或路由特定的异常处理,并使用Dead letter channel处理和存储有关失败的交换的信息。或者,您可以只使用stream或文件组件将有关异常和失败的交换的信息保存到一个单独的文件中,以避免日志溢出。

相关问题