我试图创建一个基本的Kafka项目,它将发送嵌套的JSON对象,但只接收消费者的根级别字段。
@Builder
@Jacksonized
@ToString
public class UserCreate implements Serializable {
@JsonSerialize(using = LocalDateSerializer.class)
@JsonDeserialize(using = LocalDateDeserializer.class)
private final LocalDate createDate;
private final List<Order> orders;
}
然后Order类
@Builder
@Jacksonized
@ToString
public class Order implements Serializable {
private final String name;
}
生产者配置:
@Bean
public ProducerFactory<String, UserCreate> transferProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, TransferRequest> transferKafkaTemplate() {
return new KafkaTemplate<>(transferProducerFactory());
}
ConsumerConfig:
public ConsumerFactory<String, UserCreate> transferConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group-id");
return new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
new JsonDeserializer<>(UserCreate.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, UserCreate> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, UserCreate> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(transferConsumerFactory());
return factory;
}
该设置以以下结果结束:
Received message: UserCreate(createDate=2023-10-08, orders=null)
1条答案
按热度按时间sdnqo3pr1#
你的名单是空的;这个问题与消费者没有直接关系。Kafka本身并不关心你的数据是什么格式。它存储字节,所以人们通常使用二进制数据格式而不是JSON,因为它更快,占用代理的存储空间更少。
在任何情况下,也许在发送生产者的值之前先打印它,然后使用
kafka-console-consumer
查看代理返回的值。如果你看到了这个列表,然后担心你的配置器和消费者配置