如何在动态 Camel Kafka端点中使用占位符?

nom7f22z  于 2022-11-07  发布在  Apache
关注(0)|答案(1)|浏览(122)

我正在使用动态端点向Kafka发送消息。

private static final String OUTPUT_TOPIC = "OutputTopic";
   ...

   private void setOutputTopic(Exchange exchange) {
       //some other code
       exchange.setProperty(OUTPUT_TOPIC,"MyKafkaTopic");
   }
   ...
   @Override
   public void configure() {    
        from("direct:someEndpoint")
                .process(this::setOutputTopic)
                ...
                .toD("kafka:${exchangeProperty." + OUTPUT_TOPIC + "}");             
  }

在Camel版本3.4.x中,一切正常。迁移到Camel版本3.14.x后,由于KafkaSendDynamicAware中的优化,一些测试开始失败,KafkaSendDynamicAware创建了带有URI的kafka端点,该URI包含未解析的占位符,Camel测试工具无法像以前那样工作:

@MockEndpointsAndSkip("kafka:MyKafkaTopic")
...

@EndpointInject("mock:kafka:MyKafkaTopic")
private MockEndpoint mockedMyKafkaTopic;

在这种情况下,你能建议如何测试动态Kafka终点的正确方法吗?

4nkexdtk

4nkexdtk1#

您应该能够使用属性占位符语法{{some.property}}来使用动态主题名称:

@Override
public void configure() {    
     from("direct:someEndpoint")
             .process(this::setOutputTopic)
             // ...
             .toD("kafka:{{" + OUTPUT_TOPIC + "}}");             
}

相关问题