我正在使用动态端点向Kafka发送消息。
private static final String OUTPUT_TOPIC = "OutputTopic";
...
private void setOutputTopic(Exchange exchange) {
//some other code
exchange.setProperty(OUTPUT_TOPIC,"MyKafkaTopic");
}
...
@Override
public void configure() {
from("direct:someEndpoint")
.process(this::setOutputTopic)
...
.toD("kafka:${exchangeProperty." + OUTPUT_TOPIC + "}");
}
在Camel版本3.4.x中,一切正常。迁移到Camel版本3.14.x后,由于KafkaSendDynamicAware中的优化,一些测试开始失败,KafkaSendDynamicAware创建了带有URI的kafka端点,该URI包含未解析的占位符,Camel测试工具无法像以前那样工作:
@MockEndpointsAndSkip("kafka:MyKafkaTopic")
...
@EndpointInject("mock:kafka:MyKafkaTopic")
private MockEndpoint mockedMyKafkaTopic;
在这种情况下,你能建议如何测试动态Kafka终点的正确方法吗?
1条答案
按热度按时间4nkexdtk1#
您应该能够使用属性占位符语法
{{some.property}}
来使用动态主题名称: