处理失败时出现以下异常 @StreamListener
spring云流kafka binder尝试将消息重新路由到dlq。使用spring cloud edgware.sr5。
org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'my.message.destination.my.message.group.errors'; nested exception is java.lang.ClassCastException: java.lang.String cannot be cast to [B
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:451) ~[spring-integration-core-4.3.17.RELEASE.jar:4.3.17.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:375) ~[spring-integration-core-4.3.17.RELEASE.jar:4.3.17.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.19.RELEASE.jar:4.3.19.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.19.RELEASE.jar:4.3.19.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.19.RELEASE.jar:4.3.19.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95) ~[spring-messaging-4.3.19.RELEASE.jar:4.3.19.RELEASE]
at org.springframework.integration.support.ErrorMessagePublisher.publish(ErrorMessagePublisher.java:155) ~[spring-integration-core-4.3.17.RELEASE.jar:4.3.17.RELEASE]
at org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer.recover(ErrorMessageSendingRecoverer.java:83) ~[spring-integration-core-4.3.17.RELEASE.jar:4.3.17.RELEASE]
at org.springframework.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:512) ~[spring-retry-1.2.2.RELEASE.jar:?]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:351) ~[spring-retry-1.2.2.RELEASE.jar:?]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:180) ~[spring-retry-1.2.2.RELEASE.jar:?]
at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.onMessage(RetryingAcknowledgingMessageListenerAdapter.java:73) ~[spring-kafka-1.1.8.RELEASE.jar:?]
at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.onMessage(RetryingAcknowledgingMessageListenerAdapter.java:39) ~[spring-kafka-1.1.8.RELEASE.jar:?]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:792) [spring-kafka-1.1.8.RELEASE.jar:?]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:736) [spring-kafka-1.1.8.RELEASE.jar:?]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2100(KafkaMessageListenerContainer.java:246) [spring-kafka-1.1.8.RELEASE.jar:?]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:1025) [spring-kafka-1.1.8.RELEASE.jar:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_192]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_192]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_192]
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to [B
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$4.handleMessage(KafkaMessageChannelBinder.java:360) ~[spring-cloud-stream-binder-kafka-1.3.3.RELEASE.jar:1.3.3.RELEASE]
at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:236) ~[spring-integration-core-4.3.17.RELEASE.jar:4.3.17.RELEASE]
at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:185) ~[spring-integration-core-4.3.17.RELEASE.jar:4.3.17.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) ~[spring-integration-core-4.3.17.RELEASE.jar:4.3.17.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:425) ~[spring-integration-core-4.3.17.RELEASE.jar:4.3.17.RELEASE]
... 19 more
尝试从生成消息 kafka-console-producer
,并发现只有在使用Kafka钥匙时才会发生这种情况。
以下是相关代码段:
mymessageconsumer.java:
@StreamListener(MyMessageSink.MY_MESSAGE_INPUT)
@Transactional
public void consumeMyMessage(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String myMessageId, @Payload MyMessage myMessage) {
if (true) {
throw new RuntimeException("MockRuntimeException");
}
}
application.properties(针对消费者):
spring.kafka.producer.keySerializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.bindings.my-message-in.destination=my.message.destination
spring.cloud.stream.bindings.my-message-in.group=my.message.group
spring.cloud.stream.bindings.my-message-in.content-type=application/json
spring.cloud.stream.bindings.my-message-in.consumer.headerMode=raw
spring.cloud.stream.bindings.my-message-in.consumer.partitioned=true
spring.cloud.stream.kafka.bindings.my-message-in.consumer.enableDlq=true
spring.cloud.stream.kafka.bindings.my-message-in.consumer.dlqName=my.message.destination.dlq
spring.cloud.stream.kafka.bindings.my-message-in.consumer.dlqProducerProperties.configuration.key.serializer=org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.my-message-in.consumer.dlqProducerProperties.configuration.value.serializer=org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.my-message-in.consumer.maxAttempts=3
应用特性(生产商):
spring.kafka.producer.keySerializer=org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.bindings.my-message-out.destination=my.message.destination
spring.cloud.stream.bindings.my-message-out.content-type=application/json
spring.cloud.stream.bindings.my-message-out.producer.headerMode=raw
spring.cloud.stream.bindings.my-message-out.producer.partitionKeyExtractorClass=com.example.message.TransactionKeyExtractor
spring.cloud.stream.bindings.my-message-out.producer.partitionCount=80
有没有办法让dlq重新路由以使用消息密钥?
1条答案
按热度按时间nuypyhwy1#
带有1.3.x的死信只支持SpringCloudStream的默认键/值类型(
byte[]/byte[]
).尝试升级到更新的版本。