使用spring kafka生成并侦听请求,关键字值在
生产商代码
public record ProductProducer(ReplyingKafkaTemplate<String, Object, Object> _replyTemplate,
) implements IProductProducer {
public ProductViewModel Update(ProductViewModel product, String id) throws InterruptedException, ExecutionException, TimeoutException {
RequestReplyFuture<String, Object, Object> future =
this._replyTemplate.sendAndReceive(new ProducerRecord<>(ProductTopicConstants.UPDATE_PRODUCT,0, id,product));
LOG.info(future.getSendFuture().get(kafkaConstants.kafkaTimeout, TimeUnit.SECONDS).getRecordMetadata().toString());
Product productDb = (Product) future.get(kafkaConstants.kafkaTimeout, TimeUnit.SECONDS).value();
return new ProductViewModel();
}}
监听器
@KafkaListener(id = ProductTopicConstants.UPDATE_PRODUCT, topics = ProductTopicConstants.UPDATE_PRODUCT,
containerFactory = "addUpdateProductContainerFactory")
@SendTo
public Object UpdateProduct(ConsumerRecord<String, ProductViewModel> productViewModel) {
String id = productViewModel.key();
logger.info("Listening to update product with id :", id);
return new Product();
}
键值在“”内,如下所示
消费者工厂
@Bean
public ConsumerFactory<String, String> consumerFactoryGetDeleteProduct() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
new StringDeserializer(),
new JsonDeserializer<>(String.class));
}
我有以下配置
producer:
key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapAddress.getKafka().getBootstrapAddress());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
1条答案
按热度按时间kyvafyod1#
我的序列化和反序列化是做json解析器改为字符串做的工作