我尝试使用Apache Camel通过处理器聚合来自2个端点的数据,为此,我首先阅读了相关的Camel文档:
https://camel.apache.org/components/3.20.x/eips/aggregate-eip.html
并定义一个简单的聚合策略来简单地连接2个字段(合并成功后,我将把它转换为我的数据模型)。然而,几乎所有的示例都使用相同的数据源,并且在拆分之后聚合数据等。但我需要从2个端点获取数据,然后汇总这些数据。为了简单起见,我试图从这2个数据源聚合2个字符串字段。之后,我将把聚合的数据传递给Kafka Topic。
几乎所有的例子,只有一种方法阅读数据,例如。from(direct: endpoint)
。但是我们需要2来从端点或处理器读取多个数据吗?
我还尝试使用enrich
方法,如下所示,但我无法使用这种方法聚合这2个数据:
from("timer:myTimer?period=300000") // trigger every 5 minutes
.multicast(GroupedExchangeAggregationStrategy()).parallelProcessing()
.enrich("direct:customerProcessor")
.enrich("direct:orderProcessor")
.to(kafka("customerOrderTopic"))
.end()
字符串
那么,我如何使用aggregate(或者如果您有使用enrich的想法)来实现这一点呢?
这里是另一个例子,但据我所知,它也是先读取数据,然后只对这些数据应用聚合策略。然后将其传递到第二阶段。但我希望从两个源阅读数据,然后传递到聚合策略。我错过了一些要点吗?
https://www.masterspringboot.com/camel/how-to-aggregate-messages-in-camel/的
1条答案
按热度按时间kulphzqa1#
聚合弹性公网IP主要用于聚合来自同一消费者端点的消息。
在您的例子中,由于您需要聚合来自2个不同来源的消息,我宁愿像您一样使用2个Enrich EIP,但第二个EIP具有聚合策略,以指定如何聚合消息。
类似于:
字符串
结果:
型