在spring云流中应用functionrouting时发生nullpointerexception

tvokkenx  于 2021-07-24  发布在  Java
关注(0)|答案(0)|浏览(265)

我想在我的项目中使用ApacheKafka和SpringCloudStream(3.0)。一般来说,当供应商和消费者注册为@bean时,生产和消费是可以的,没有任何问题,但是如果仅按以下方式应用functionrouter,则在消费者端会发生nullpointerexception。究竟是什么原因?串行化不能正常工作吗?
供应商:

@Bean
fun upperCase(): Supplier<Message<MessageDto>?> = Supplier { createEvent() }

private fun createEvent(): Message<MessageDto>? {
    return if (supplierEnabled)
        MessageBuilder.withPayload(MessageDto(1, "I send the message!"))
            .setHeader("to_process", true)
            .build()
    else
        null
}

消费者:

@Bean
fun first(): Consumer<MessageDto> = Consumer {
    log.info("Received(process): {}", it)
}

@Bean
fun second(): Consumer<MessageDto> = Consumer {
    log.info("Received(handle): {}", it)
}

生产者应用.yml:

spring:
  application:
    name: producer
  cloud:
    stream:
      bindings:
        upperCase-out-0:
          destination: memberChangeTopic
          content-type: application/json
        producer:
          partitionKeyExpression: payload.id
          partitionCount: 10
      kafka:
        binder:
          brokers: localhost
          defaultBrokerPort: 9092

消费者应用程序.yml:

server:
  port: 8081
spring:
  application:
    name: consumer
  cloud:
    stream:
      bindings:
        functionRouter-in-0:
          destination: memberChangeTopic
          group: consumer-a
      function:
        routing:
          enabled: true
    function:
      routing-expression: (headers['to_process']!=null && headers['to_process']==true) ? 'first':'second'

错误跟踪:

2021-02-05 09:40:44.947 ERROR 98560 --- [container-0-C-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@18545e81]; nested exception is java.lang.NullPointerException, failedMessage=GenericMessage [payload=byte[47], headers={kafka_offset=24, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@1d55aff4, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, to_process=true, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=memberChangeTopic, kafka_receivedTimestamp=1612485254857, kafka_groupId=consumer-a, target-protocol=kafka}]

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题