Apache Camel中的关闭消息

up9lanfz  于 2023-01-20  发布在  Apache
关注(0)|答案(5)|浏览(214)

希望这听起来不可笑,但我怎么能故意 * 丢弃 Camel * 中的消息
到目前为止,我把它们发送到日志组件,但同时我甚至不想记录取款。
Camel中是否存在**/dev/null**端点?

57hvy0tb

57hvy0tb1#

您可以使用消息过滤器eip过滤掉不需要的消息。http://camel.apache.org/message-filter
没有dev/null组件。
此外,还有一个< stop />您可以在路由中使用,当一个消息击中它时,它将停止继续路由。
我们得到的最接近dev/null的方法是路由到一个日志,其中您将logLeve=OFF设置为选项。

ig9co6j1

ig9co6j12#

感谢我的同事(代号:卡亚)...
您可以使用Stub Component作为 Camel 式端点,它等效于/dev/null
例如:

activemq:route?abc=xyz

变成

stub:activemq:route?abc=xyz

虽然我不知道这个组件的内部工作原理(以及是否存在内存泄漏等危险),但它对我很有效,我看不出这样做有什么缺点。

ffscu2ro

ffscu2ro3#

可以使用property component将uri/mock-uri放入配置中

<camelContext ...>
   <propertyPlaceholder id="properties" location="ref:myProperties"/>
</camelContext>

// properties
cool.end=mock:result
# cool.end=result

// route
from("direct:start").to("properties:{{cool.end}}");
vsikbqxv

vsikbqxv4#

我有点晚了,但是您可以在交换上设置一个标志,如果该消息不满足您的条件,则使用该标志只跳过该消息(通过调用stop)。

@Override
public void configure() throws Exception {
  from()
      .process(new Processor() {
        @SuppressWarnings("unchecked")
        @Override
        public void process(Exchange exchange) throws Exception {
          exchange.setProperty("skip", false);
          byte[] messageBytes = exchange.getIn().getBody(byte[].class);
          if (<shouldNotSkip>) {

          } else { //skip
            exchange.setProperty("skip", true);
          }
        }
      }).choice()
          .when(exchangeProperty("skip").isEqualTo(true))
            .stop()
          .otherwise()
            .to();
}
9cbw7uwe

9cbw7uwe5#

我使用的是activemq route,一般情况下需要发送回复,所以交换模式是InOut,当我在路由中配置了一个filter,发现即使它不把消息传递到下一步,也会执行回调(发送回复),就像调用stop()时的行为一样,而且它会把同样的消息发送回回复队列,这是不可取的。
我所做的是有条件地将交换模式更改为InOnly,如果我想过滤掉消息,则停止,这样就不会发送回复。MAIN_ENDPOINT是我定义的包含正常业务逻辑的direct:main端点。

from("activemq:queue:myqueue" + "?replyToSameDestinationAllowed=true")
    .log(LoggingLevel.INFO, "Correlation id is: ${header.JMSCorrelationID}; will ignore if not null")
    .choice()
        .when(simple("${header.JMSCorrelationID} == null"))
            .to(MAIN_ENDPOINT)
        .endChoice()
        .otherwise()
            .setExchangePattern(ExchangePattern.InOnly)
            .stop()
        .endChoice()
    .end();

请注意,此消息也会被使用,并且不再位于队列中。如果您希望将消息保留在队列中(不使用它),您可以只使用stop()filter(),以便回调(发送原始消息的回复)工作,将消息放回队列。
只使用filter()要简单得多:

from("activemq:queue:myqueue" + "?replyToSameDestinationAllowed=true")
    .log(LoggingLevel.INFO, "Correlation id is: ${header.JMSCorrelationID}; will ignore if not null")
    .filter(simple("${header.JMSCorrelationID} == null"))
    .to(MAIN_ENDPOINT);

相关问题