使用kafka向dlq spring云流发送消息时出错

66bbxpm5  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(353)

pom.xml文件

<dependencies>
         <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
         </dependency>
         <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
        </dependencies>
        <dependencyManagement>
        <dependencies>
        <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>Edgware.SR3</version>
                <type>pom</type>
                <scope>import</scope>
          </dependency>
          </dependencies>
          </dependencyManagement>

@Component
public class QueueConsumer {

    /**The Constant LOG. */
    public static final Logger LOG = LoggerFactory.getLogger(QueueConsumer.class);

    /**The processor. */
    @Autowired
    private IMessageProcessor processor;

    /**
     * Consume.
     *
     * @param message the message
     */
    @StreamListener(value = OrderEventSink.ORDER_EVENT)
    public void consume(Message<String> message) {
        try {
            processor.process(message);
        } catch (MessageProcessingFailedException e) {
            LOG.error("Error Code "+ e.getCode().getCode() + " " + e.getCode().getDescription(), e);
            throw e;
        }
    }
}

我正在使用springcloudstream阅读Kafka主题的消息。消息正在从队列中读取、处理,如果处理过程中失败,则消息应该到达配置的错误队列,但会出现以下错误。
当从消息中提取头文件时会出现异常,什么是解决此问题的最佳方法?
Kafka版本为1.0,Kafka客户端为2.11-1.0

应用程序属性

spring.cloud.stream.bindings.orderEvent.destination=orderEvents
     spring.cloud.stream.bindings.orderEvent.content- 
     type=application/json
     spring.cloud.stream.bindings.orderEvent.group=orderEvents-consumer
     spring.cloud.stream.bindings.orderEvent.consumer.back-off- 
     multiplier=5
     spring.cloud.stream.bindings.orderEvent.consumer.back-off-initial- 
     interval=60000
     spring.cloud.stream.bindings.orderEvent.consumer.max-attempts=1
     spring.cloud.stream.bindings.orderEvent.consumer.headerMode=raw
     spring.cloud.stream.bindings.kafka.binder.brokers=localhost
     spring.cloud.stream.bindings.kafka.binder.defaultBrokerPort=9092
     spring.cloud.stream.bindings.kafka.binder.zkNodes=localhost
     spring.cloud.stream.bindings.kafka.binder.defaultZkPort=2181
     spring.cloud.stream.kafka.bindings.orderEvent.consumer.
     enableDlq=true
     spring.cloud.stream.kafka.bindings.orderEvent.consumer.
     dlqName=dead-queue
     spring.cloud.stream.kafka.bindings.orderEvent.consumer.
     dlqProducerProperties.configuration.key.
     serializer=org.apache.kafka.common.serialization.StringSerializer
     spring.cloud.stream.kafka.bindings.orderEvent.consumer.
     dlqProducerProperties.configuration.value.
     serializer=org.apache.kafka.common.serialization.StringSerializer

org.springframework.messaging.messagedeliveryexception:无法将消息发送到通道“scm orderevents.scm orderevents consumer.errors”;嵌套异常为java.lang.runtimeexception:java.lang.stringindexoutofboundsexception:org.springframework.integration.channel.abstractmessagechannel.send(abstractmessagechannel)处的字符串索引超出范围:4297。java:451)~[spring-integration-core-4.3.12.发布。jar:4.3.12.release]在org.springframework.integration.channel.abstractmessagechannel.send(abstractmessagechannel)。java:375)~[spring-integration-core-4.3.12.发布。jar:4.3.12.release]在org.springframework.messaging.core.genericmessagingtemplate.dosend(genericmessagingtemplate。java:115)~[spring-messaging-4.3.13.发布。jar:4.3.13.release]在org.springframework.messaging.core.genericmessagingtemplate.dosend(genericmessagingtemplate。java:45)~[spring-messaging-4.3.13.发布。jar:4.3.13.release]在org.springframework.messaging.core.abstractmessagesendingtemplate.send(abstractmessagesendingtemplate。java:105)~[spring-messaging-4.3.13.发布。jar:4.3.13.release]在org.springframework.integration.endpoint.messageproducersupport.senderrormessageifnecessary(messageproducersupport。java:207)~[spring-integration-core-4.3.12.发布。jar:4.3.12.release]位于org.springframework.integration.endpoint.messageproducersupport.sendmessage(messageproducersupport。java:191) ~[spring-integration-core-4.3.12.发布。jar:4.3.12.release]访问org.springframework.integration.kafka.inbound.kafkamessagedrivenchanneladapter.access$200(kafkamessagedrivenchanneladapter。java:63)~[spring-integration-kafka-2.1.2.发布。jar:na]在org.springframework.integration.kafka.inbound.kafkamessagedrivenchanneladapter$integrationrecordmessagelistener.onmessage(kafkamessagedrivenchanneladapter)。java:372)~[spring-integration-kafka-2.1.2.发布。jar:na]在org.springframework.integration.kafka.inbound.kafkamessagedrivenchanneladapter$integrationrecordmessagelistener.onmessage(kafkamessagedrivenchanneladapter)。java:352)~[spring-integration-kafka-2.1.2.发布。jar:na]在org.springframework.kafka.listener.kafkamessagelistenercontainer$listenerconsumer.invokerecordlistener(kafkamessagelistenercontainer)。java:794)[spring-kafka-1.1.6.发布。jar:na]在org.springframework.kafka.listener.kafkamessagelistenercontainer$listenerconsumer.invokelistener(kafkamessagelistenercontainer。java:738) [spring-kafka-1.1.6.发布。jar:na]在org.springframework.kafka.listener.kafkamessagelistenercontainer$listenerconsumer.access$2200(kafkamessagelistenercontainer。java:245)[spring-kafka-1.1.6.发布。jar:na]在org.springframework.kafka.listener.kafkamessagelistenercontainer$listenerconsumer$listenerinvoker.run(kafkamessagelistenercontainer)。java:1031)[spring-kafka-1.1.6.发布。jar:na]在java.util.concurrent.executors$runnableadapter.call(executors。java:511)[na:1.8.0¢]位于java.util.concurrent.futuretask.run(futuretask。java:266)[na:1.8.0¢]在java.lang.thread.run(线程。java:748)[na:1.8.0¢]原因:java.lang.runtimeexception:java.lang.stringindexoutofboundsexception:字符串索引超出范围:4297 at org.springframework.cloud.stream.binder.kafka.kafkamessagechannelbinder$4.handlemessage(kafkamessagechannelbinder)。java:380) ~[spring-cloud-stream-binder-kafka-1.3.2.发布。jar:1.3.2.release]在org.springframework.integration.dispatcher.broadcastingdispatcher.invokehandler(broadcastingdispatcher。java:236)~[spring-integration-core-4.3.12.发布。jar:4.3.12.release]在org.springframework.integration.dispatcher.broadcastingdispatcher.dispatch(广播调度器)。java:185)~[spring-integration-core-4.3.12.发布。jar:4.3.12.release]在org.springframework.integration.channel.abstractsubscribablechannel.dosend(abstractsubscribablechannel。java:89)~[spring-integration-core-4.3.12.发布。jar:4.3.12.release]在org.springframework.integration.channel.abstractmessagechannel.send(abstractmessagechannel)。java:425)~[spring-integration-core-4.3.12.发布。jar:4.3.12.release] ... 16常见帧被忽略,原因是:java.lang.stringindexoutofboundsexception:字符串索引超出范围:4297 at java.lang.string.checkbounds(string。java:385)~[na:1.8.0¢]在java.lang.string.(string。java:425)~[na:1.8.0¢]位于org.springframework.cloud.stream.binder.embeddedheaderutils.oldextractheaders(embeddedheaderutils)。java:154) ~[spring-cloud-stream-1.3.2.发布。jar:1.3.2.release]在org.springframework.cloud.stream.binder.embeddedheaderutils.extractheaders(embeddedheaderutils。java:115)~[spring-cloud-stream-1.3.2.发布。jar:1.3.2.release]在org.springframework.cloud.stream.binder.embeddedheaderutils.extractheaders(embeddedheaderutils。java:107) ~[spring-cloud-stream-1.3.2.发布。jar:1.3.2.release]在org.springframework.cloud.stream.binder.kafka.kafkamessagechannelbinder$4.handlemessage(kafkamessagechannelbinder。java:368)~[spring-cloud-stream-binder-kafka-1.3.2.发布。jar:1.3.2.release] ... 省略20个公共框架

vdzxcuhz

vdzxcuhz1#

这是Kafka活页夹1.3.2版本中的一个bug;它固定在主机上(1.3.3.构建快照)。
顺便说一句,最好的解决方案是使用springboot2.0.1和scst emlhurst.release(由cloudfinchley引入-目前处于m9里程碑阶段)。
这些版本本机支持Kafka1.0。
您还可以成功地迁移到kafka11绑定器工件(1.3.0),它与scst1.3.x兼容,如wiki上所讨论的。

相关问题