Kafka发送嵌套json对象

3hvapo4f  于 2023-10-15  发布在  Apache
关注(0)|答案(1)|浏览(106)

我试图创建一个基本的Kafka项目,它将发送嵌套的JSON对象,但只接收消费者的根级别字段。

@Builder
@Jacksonized
@ToString
public class UserCreate implements Serializable {

    @JsonSerialize(using = LocalDateSerializer.class)
    @JsonDeserialize(using = LocalDateDeserializer.class)
    private final LocalDate createDate;

    private final List<Order> orders;

}

然后Order类

@Builder
@Jacksonized
@ToString
public class Order implements Serializable {
   private final String name;
}

生产者配置:

@Bean
public ProducerFactory<String, UserCreate> transferProducerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, TransferRequest> transferKafkaTemplate() {
    return new KafkaTemplate<>(transferProducerFactory());
}

ConsumerConfig:

public ConsumerFactory<String, UserCreate> transferConsumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group-id");
    return new DefaultKafkaConsumerFactory<>(
        props,
        new StringDeserializer(),
        new JsonDeserializer<>(UserCreate.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, UserCreate> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, UserCreate> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(transferConsumerFactory());
    return factory;
}

该设置以以下结果结束:

Received message: UserCreate(createDate=2023-10-08, orders=null)
sdnqo3pr

sdnqo3pr1#

你的名单是空的;这个问题与消费者没有直接关系。Kafka本身并不关心你的数据是什么格式。它存储字节,所以人们通常使用二进制数据格式而不是JSON,因为它更快,占用代理的存储空间更少。
在任何情况下,也许在发送生产者的值之前先打印它,然后使用kafka-console-consumer查看代理返回的值。如果你看到了这个列表,然后担心你的配置器和消费者配置

相关问题