streamlistener似乎无法处理消息类型

wixjitnu  于 2021-07-16  发布在  Java
关注(0)|答案(1)|浏览(364)

我们有一个应用程序,它使用来自Kafka的消息并对其进行处理。我们使用的是springboot2.2.6.release和springcloudhoxton.sr4。
我试图收到一个简单的信息:

{
  "payload": {
    "config": {
      "credentials": {}
    },
    "id": "est-00001",
    "merchantKey": "test-00001",
    "name": "Test",
    "version": 7,
    "type": "PARTNER",
    "vendorNumber": "14"
  },
  "metadata": {
    "timestamp": -1,
    "partition": 8,
    "key": "test-00001",
    "offset": 105,
    "topic": "configure",
    "headers": []
  },
  "key": "00001",
  "messageType": "dent.set",
  "id": "abf75248-6fb0-4b57-a92c-74d4d3143cc0",
  "time": "2018-03-16T15:56Z"
}

这就是我用来反序列化消息的模型

package com.commercetools.tuev.marketplace.merchant.model;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.Map;

@JsonInclude(JsonInclude.Include.NON_NULL)
@Data
@AllArgsConstructor
public class MerchantMessage {
    public MerchantMessage(){}

    @JsonProperty("id")
    @JsonPropertyDescription("The message id, usually a random UUID")
    private String id;

    @JsonProperty("time")
    @JsonPropertyDescription("ISO-8501 timestamp of the event")
    private String time;

    @JsonProperty("key")
    @JsonPropertyDescription("The key has to always match the entity's id (product id)")
    private String key;

    @JsonProperty("messageType")
    @JsonPropertyDescription("The Message Type")
    private String messageType;

    @JsonProperty("payload")
    private Map<String, Object> payload;

    @JsonProperty("metadata")
    private Map<String, Object> metadata;
}

我只得到以下例外:

Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `org.springframework.messaging.Message` (no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information

应用程序属性

spring.cloud.stream.bindings.merchantInput.destination=merchantTopic
spring.cloud.stream.bindings.merchantInput.group=consumerGroup
spring.cloud.stream.bindings.merchantInput.consumer.header-mode=none
spring.cloud.stream.bindings.merchantInput.content-type=application/json
spring.cloud.stream.kafka.bindings.merchantInput.consumer.autoCommitOffset=false
spring.cloud.stream.bindings.merchantInput.consumer.partitioned=true
spring.cloud.stream.bindings.merchantInput.consumer.max-attempts=1

功能

@StreamListener(value = MerchantProcessor.INPUT)
    public void manage(Flux<Message<MerchantMessage>> message) {
        message
                .map(Message::getPayload)
                .doOnNext(System.out::println)
                .subscribe(payload-> System.out.println("Consumed: "+payload));
    }

如果我去掉焊剂,一切正常:

@StreamListener(value = MerchantProcessor.INPUT)
    public void manage(Message<MerchantMessage> message) {
        Mono.just(message)
                .map(Message::getPayload)
                .doOnNext(System.out::println)
                .subscribe(payload -> System.out.println("Consumed: " + payload));
    }
wtzytmuj

wtzytmuj1#

这个 @StreamListener 基于注解的编程模型已经被弃用了一段时间,在过去的几年中,我们已经完全迁移到不需要注解的函数式编程模型。
您只需将代码更改为

@Bean
Consumer<Flux<Message<MerchantMessage>> message> {
   return flux -> flux
                .map(Message::getPayload)
                .doOnNext(System.out::println)
                .subscribe(payload-> System.out.println("Consumed: "+payload));
}

您的输入绑定按约定命名为 message-in-0 . 你也可以在这里获得更多信息。

相关问题