我们有一个应用程序,它使用来自Kafka的消息并对其进行处理。我们使用的是springboot2.2.6.release和springcloudhoxton.sr4。
我试图收到一个简单的信息:
{
"payload": {
"config": {
"credentials": {}
},
"id": "est-00001",
"merchantKey": "test-00001",
"name": "Test",
"version": 7,
"type": "PARTNER",
"vendorNumber": "14"
},
"metadata": {
"timestamp": -1,
"partition": 8,
"key": "test-00001",
"offset": 105,
"topic": "configure",
"headers": []
},
"key": "00001",
"messageType": "dent.set",
"id": "abf75248-6fb0-4b57-a92c-74d4d3143cc0",
"time": "2018-03-16T15:56Z"
}
这就是我用来反序列化消息的模型
package com.commercetools.tuev.marketplace.merchant.model;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Map;
@JsonInclude(JsonInclude.Include.NON_NULL)
@Data
@AllArgsConstructor
public class MerchantMessage {
public MerchantMessage(){}
@JsonProperty("id")
@JsonPropertyDescription("The message id, usually a random UUID")
private String id;
@JsonProperty("time")
@JsonPropertyDescription("ISO-8501 timestamp of the event")
private String time;
@JsonProperty("key")
@JsonPropertyDescription("The key has to always match the entity's id (product id)")
private String key;
@JsonProperty("messageType")
@JsonPropertyDescription("The Message Type")
private String messageType;
@JsonProperty("payload")
private Map<String, Object> payload;
@JsonProperty("metadata")
private Map<String, Object> metadata;
}
我只得到以下例外:
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `org.springframework.messaging.Message` (no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
应用程序属性
spring.cloud.stream.bindings.merchantInput.destination=merchantTopic
spring.cloud.stream.bindings.merchantInput.group=consumerGroup
spring.cloud.stream.bindings.merchantInput.consumer.header-mode=none
spring.cloud.stream.bindings.merchantInput.content-type=application/json
spring.cloud.stream.kafka.bindings.merchantInput.consumer.autoCommitOffset=false
spring.cloud.stream.bindings.merchantInput.consumer.partitioned=true
spring.cloud.stream.bindings.merchantInput.consumer.max-attempts=1
功能
@StreamListener(value = MerchantProcessor.INPUT)
public void manage(Flux<Message<MerchantMessage>> message) {
message
.map(Message::getPayload)
.doOnNext(System.out::println)
.subscribe(payload-> System.out.println("Consumed: "+payload));
}
如果我去掉焊剂,一切正常:
@StreamListener(value = MerchantProcessor.INPUT)
public void manage(Message<MerchantMessage> message) {
Mono.just(message)
.map(Message::getPayload)
.doOnNext(System.out::println)
.subscribe(payload -> System.out.println("Consumed: " + payload));
}
1条答案
按热度按时间wtzytmuj1#
这个
@StreamListener
基于注解的编程模型已经被弃用了一段时间,在过去的几年中,我们已经完全迁移到不需要注解的函数式编程模型。您只需将代码更改为
您的输入绑定按约定命名为
message-in-0
. 你也可以在这里获得更多信息。