我有一个springboot应用程序(app0),它使用springcloudstreamkafka阅读某个主题。
还有另外两个应用程序(app1、app2)会将消息生成到该主题中。
app1使用接口ordersource发布消息:
public interface OrderSource{
String OUTPUT_PAYMENT = Topic.PAYMENT_RESULTS;
@Output(OrderSource.OUTPUT_PAYMENT)
MessageChannel output();
例如:
orderSource.output().send(MessageBuilder.withPayload(order).build(), 500);
在本例中,app0毫无问题地从app1读取消息。
app2使用kafkatemplate发布其消息:
ListenableFuture<SendResult<Integer, String>> delivery = kafkaTemplate.send(Topic.PAYMENT_RESULTS, "{ ... }");
try {
SendResult<Integer, String> result = delivery.get(timeout, TimeUnit.MILLISECONDS);
在这种情况下,我观察到以下例外情况 EmbeddedHeadersMessageConverter
:
java.lang.StringIndexOutOfBoundsException: String index out of range: 152
at java.lang.String.checkBounds(Unknown Source) ~[na:1.8.0_91]
at java.lang.String.<init>(Unknown Source) ~[na:1.8.0_91]
at org.springframework.cloud.stream.binder.EmbeddedHeadersMessageConverter.oldExtractHeaders(EmbeddedHeadersMessageConverter.java:135) ~[spring-cloud-stream-1.1.0.RELEASE.jar:1.1.0.RELEASE]
at org.springframework.cloud.stream.binder.EmbeddedHeadersMessageConverter.extractHeaders(EmbeddedHeadersMessageConverter.java:105) ~[spring-cloud-stream-1.1.0.RELEASE.jar:1.1.0.RELEASE]
显然,它正试图从消息的有效负载中提取头。如何在同时支持两个消息源(kafkatemplate和ordersource)时防止发生此异常。
1条答案
按热度按时间xdyibdwo1#
要与非spring云流应用程序通信,需要配置
headerMode
关于消费者raw
.您还需要对app1的producer执行同样的操作,这样他就不会嵌入头了。
请参见消费者属性和生产者属性。