kafka producer在spring批处理中使用多线程编写器向主题发送重复消息

jhdbpxl9  于 2021-07-06  发布在  Java
关注(0)|答案(0)|浏览(232)

我将spring批处理与writer一起使用(如果条件失败,还具有重试逻辑,请尝试再次执行它)作为多线程,如果writer中的条件得到满足,我将尝试向主题发送kafka消息。我使用的是kafka模板,配置中示例化了bean。

  1. @Bean
  2. public Producer<Long, String> producerConfigs() {
  3. Properties props = new Properties();
  4. //bootstrap servers and protocol configs here
  5. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
  6. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  7. return new KafkaProducer<>(props);
  8. }

在编写器部分,我在executorservice.submit(…)中使用了线程计数50,我使用以下代码将消息发送到主题

  1. public void send(String message){
  2. LOG.info("sending message='{}' to topic='{}'", message, topic);
  3. kafkaTemplate.send(topic, message).get();
  4. }

我的问题是我得到了关于这个主题的重复日志和重复Kafka信息。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题