consumerrecord在ApacheSpringKafka的双引号“”中包含键值

pxy2qtax  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(377)

使用spring kafka生成并侦听请求,关键字值在

生产商代码

  1. public record ProductProducer(ReplyingKafkaTemplate<String, Object, Object> _replyTemplate,
  2. ) implements IProductProducer {
  3. public ProductViewModel Update(ProductViewModel product, String id) throws InterruptedException, ExecutionException, TimeoutException {
  4. RequestReplyFuture<String, Object, Object> future =
  5. this._replyTemplate.sendAndReceive(new ProducerRecord<>(ProductTopicConstants.UPDATE_PRODUCT,0, id,product));
  6. LOG.info(future.getSendFuture().get(kafkaConstants.kafkaTimeout, TimeUnit.SECONDS).getRecordMetadata().toString());
  7. Product productDb = (Product) future.get(kafkaConstants.kafkaTimeout, TimeUnit.SECONDS).value();
  8. return new ProductViewModel();
  9. }}

监听器

  1. @KafkaListener(id = ProductTopicConstants.UPDATE_PRODUCT, topics = ProductTopicConstants.UPDATE_PRODUCT,
  2. containerFactory = "addUpdateProductContainerFactory")
  3. @SendTo
  4. public Object UpdateProduct(ConsumerRecord<String, ProductViewModel> productViewModel) {
  5. String id = productViewModel.key();
  6. logger.info("Listening to update product with id :", id);
  7. return new Product();
  8. }

键值在“”内,如下所示

消费者工厂

  1. @Bean
  2. public ConsumerFactory<String, String> consumerFactoryGetDeleteProduct() {
  3. return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
  4. new StringDeserializer(),
  5. new JsonDeserializer<>(String.class));
  6. }

我有以下配置

  1. producer:
  2. key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  3. value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  4. consumer:
  5. key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
  6. value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
  7. @Bean
  8. public Map<String, Object> consumerConfigs() {
  9. Map<String, Object> props = new HashMap<>();
  10. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapAddress.getKafka().getBootstrapAddress());
  11. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  12. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  13. return props;
  14. }
kyvafyod

kyvafyod1#

我的序列化和反序列化是做json解析器改为字符串做的工作

  1. producer:
  2. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  3. consumer:
  4. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer

相关问题