embeddedheadersmessageconverter-java.lang.stringindexoutofboundsexception:中的字符串索引超出范围

nkoocmlb  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(371)

我有一个springboot应用程序(app0),它使用springcloudstreamkafka阅读某个主题。
还有另外两个应用程序(app1、app2)会将消息生成到该主题中。
app1使用接口ordersource发布消息:

public interface OrderSource{ 

    String OUTPUT_PAYMENT = Topic.PAYMENT_RESULTS;

    @Output(OrderSource.OUTPUT_PAYMENT)
    MessageChannel output();

例如:

orderSource.output().send(MessageBuilder.withPayload(order).build(), 500);

在本例中,app0毫无问题地从app1读取消息。
app2使用kafkatemplate发布其消息:

ListenableFuture<SendResult<Integer, String>> delivery = kafkaTemplate.send(Topic.PAYMENT_RESULTS, "{ ... }");
try {
    SendResult<Integer, String> result = delivery.get(timeout, TimeUnit.MILLISECONDS);

在这种情况下,我观察到以下例外情况 EmbeddedHeadersMessageConverter :

java.lang.StringIndexOutOfBoundsException: String index out of range: 152
    at java.lang.String.checkBounds(Unknown Source) ~[na:1.8.0_91]
    at java.lang.String.<init>(Unknown Source) ~[na:1.8.0_91]
    at org.springframework.cloud.stream.binder.EmbeddedHeadersMessageConverter.oldExtractHeaders(EmbeddedHeadersMessageConverter.java:135) ~[spring-cloud-stream-1.1.0.RELEASE.jar:1.1.0.RELEASE]
    at org.springframework.cloud.stream.binder.EmbeddedHeadersMessageConverter.extractHeaders(EmbeddedHeadersMessageConverter.java:105) ~[spring-cloud-stream-1.1.0.RELEASE.jar:1.1.0.RELEASE]

显然,它正试图从消息的有效负载中提取头。如何在同时支持两个消息源(kafkatemplate和ordersource)时防止发生此异常。

xdyibdwo

xdyibdwo1#

要与非spring云流应用程序通信,需要配置 headerMode 关于消费者 raw .
您还需要对app1的producer执行同样的操作,这样他就不会嵌入头了。
请参见消费者属性和生产者属性。

相关问题