如何在使用apache camel时为Azure服务总线主题消息添加自定义属性或标签/主题?

cunj1qz1  于 2022-11-07  发布在  Apache
关注(0)|答案(1)|浏览(112)

我可以使用apache camel将消息发送到Azure服务总线主题的特定订阅,使用此处的示例https://camel.apache.org/components/3.18.x/azure-servicebus-component.html#_azure_servicebus_producer_operations。但我无法使用我的代码设置属性。在我的代码下面-

from("direct:start")
    .id("producerId")
            .marshal(new JacksonDataFormat(String.class))
    .process(exchange -> {
    exchange.setProperty(ServiceBusConstants.SUBJECT, constant("test"));
    })
    .setProperty("subject", constant("test"))
    .setProperty(ServiceBusConstants.CORRELATION_ID, constant("111111"))
    .setHeader("subject", constant("test"))
    .setHeader("label", constant("test"))
    .setHeader(ServiceBusConstants.SUBJECT, constant("test"))
    .to("azure-servicebus:testTopic?serviceBusType=topic&subscriptionName=testTopic-subscription&producerOperation=sendMessages&connectionString=RAW(Endpoint=sb://blablablablbalabla")
    .log(LoggingLevel.INFO, "Message sent to test topic ${body} and ${headers}")
    .setRouteProperties(propertyDefinitions);

正如你看到上面我已经尝试了一切,如与“setProperty”和“setHeader”不同的方式。我得到下面的响应-

Message sent to test topic "{\"accountId\": \"4471112323123\", \"url\": \"test.com\", \"status\": \"PASS\", \"statusMessage\": \"check if received\"}" and {applicationProperties={label: test}, CamelAzureServiceBusApplicationProperties={Label=test, Subject=test}, CamelAzureServiceBusSubject=test, Content-Type=application/json}

这是我的制作人代码-

Test test = new test(uuid, "test.com", "PASS", "check if received");
ProducerTemplate producerTemplate;
producerTemplate.sendBody(direct:start, test.toString());

我已经通过Azure Portal(ui)发送了一条消息,其属性如下:

如果您看到“subject”是“test”,并且有一个自定义属性“test”,其值为“test”。
我想看到同样的东西,当我用apache Camel 发送它。请帮助。谢谢

afdcj2ne

afdcj2ne1#

今天,这是不可能的最新版本的 Camel ,但它将在下一个版本。
相关票证:https://issues.apache.org/jira/browse/CAMEL-18459
代码:https://github.com/apache/camel/blob/main/components/camel-azure/camel-azure-servicebus/src/main/java/org/apache/camel/component/azure/servicebus/ServiceBusProducer.java
它看起来是这样的:

from("timer:foo1?period=15000&delay=1000")
                .process(new Processor() {
                    @Override
                    public void process(Exchange exchange) throws Exception {
                        exchange.getIn().setBody("{\"message\":\"hello\"}");
                        exchange.getIn().setHeader(ServiceBusConstants.APPLICATION_PROPERTIES, Map.of("prop1", "value1"));
                    }
                })
                .to("azure-servicebus:demo?serviceBusType=topic&connectionString=RAW(connection-string-here");

相关问题