java Apache Camel:无法聚合来自2个源的数据

m3eecexj  于 2023-08-01  发布在  Java
关注(0)|答案(1)|浏览(110)

我尝试使用Apache Camel通过处理器聚合来自2个端点的数据,为此,我首先阅读了相关的Camel文档:
https://camel.apache.org/components/3.20.x/eips/aggregate-eip.html
并定义一个简单的聚合策略来简单地连接2个字段(合并成功后,我将把它转换为我的数据模型)。然而,几乎所有的示例都使用相同的数据源,并且在拆分之后聚合数据等。但我需要从2个端点获取数据,然后汇总这些数据。为了简单起见,我试图从这2个数据源聚合2个字符串字段。之后,我将把聚合的数据传递给Kafka Topic。
几乎所有的例子,只有一种方法阅读数据,例如。from(direct: endpoint)。但是我们需要2来从端点或处理器读取多个数据吗?
我还尝试使用enrich方法,如下所示,但我无法使用这种方法聚合这2个数据:

from("timer:myTimer?period=300000") // trigger every 5 minutes
    .multicast(GroupedExchangeAggregationStrategy()).parallelProcessing()
    .enrich("direct:customerProcessor")
    .enrich("direct:orderProcessor")
    .to(kafka("customerOrderTopic"))
    .end()

字符串
那么,我如何使用aggregate(或者如果您有使用enrich的想法)来实现这一点呢?
这里是另一个例子,但据我所知,它也是先读取数据,然后只对这些数据应用聚合策略。然后将其传递到第二阶段。但我希望从两个源阅读数据,然后传递到聚合策略。我错过了一些要点吗?
https://www.masterspringboot.com/camel/how-to-aggregate-messages-in-camel/

kulphzqa

kulphzqa1#

聚合弹性公网IP主要用于聚合来自同一消费者端点的消息。
在您的例子中,由于您需要聚合来自2个不同来源的消息,我宁愿像您一样使用2个Enrich EIP,但第二个EIP具有聚合策略,以指定如何聚合消息。
类似于:

from("timer:myTimer?period=1000") 
        .enrich("direct:customerProcessor")
        .enrich("direct:orderProcessor", (oldExchange, newExchange) -> {
                String oldBody = oldExchange.getIn().getBody(String.class);
                String newBody = newExchange.getIn().getBody(String.class);
                oldExchange.getIn().setBody(oldBody + "-" + newBody);
                return oldExchange;
        })
        .to("stream:out");
from("direct:customerProcessor")
        .setBody().constant("My customerProcessor body");
from("direct:orderProcessor")
        .setBody().constant("My orderProcessor body");

字符串
结果:

My customerProcessor body-My orderProcessor body

相关问题