我想在我的项目中使用ApacheKafka和SpringCloudStream(3.0)。一般来说,当供应商和消费者注册为@bean时,生产和消费是可以的,没有任何问题,但是如果仅按以下方式应用functionrouter,则在消费者端会发生nullpointerexception。究竟是什么原因?串行化不能正常工作吗?
供应商:
@Bean
fun upperCase(): Supplier<Message<MessageDto>?> = Supplier { createEvent() }
private fun createEvent(): Message<MessageDto>? {
return if (supplierEnabled)
MessageBuilder.withPayload(MessageDto(1, "I send the message!"))
.setHeader("to_process", true)
.build()
else
null
}
消费者:
@Bean
fun first(): Consumer<MessageDto> = Consumer {
log.info("Received(process): {}", it)
}
@Bean
fun second(): Consumer<MessageDto> = Consumer {
log.info("Received(handle): {}", it)
}
生产者应用.yml:
spring:
application:
name: producer
cloud:
stream:
bindings:
upperCase-out-0:
destination: memberChangeTopic
content-type: application/json
producer:
partitionKeyExpression: payload.id
partitionCount: 10
kafka:
binder:
brokers: localhost
defaultBrokerPort: 9092
消费者应用程序.yml:
server:
port: 8081
spring:
application:
name: consumer
cloud:
stream:
bindings:
functionRouter-in-0:
destination: memberChangeTopic
group: consumer-a
function:
routing:
enabled: true
function:
routing-expression: (headers['to_process']!=null && headers['to_process']==true) ? 'first':'second'
错误跟踪:
2021-02-05 09:40:44.947 ERROR 98560 --- [container-0-C-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@18545e81]; nested exception is java.lang.NullPointerException, failedMessage=GenericMessage [payload=byte[47], headers={kafka_offset=24, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@1d55aff4, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, to_process=true, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=memberChangeTopic, kafka_receivedTimestamp=1612485254857, kafka_groupId=consumer-a, target-protocol=kafka}]
暂无答案!
目前还没有任何答案,快来回答吧!