我将spring批处理与writer一起使用(如果条件失败,还具有重试逻辑,请尝试再次执行它)作为多线程,如果writer中的条件得到满足,我将尝试向主题发送kafka消息。我使用的是kafka模板,配置中示例化了bean。
@Bean
public Producer<Long, String> producerConfigs() {
Properties props = new Properties();
//bootstrap servers and protocol configs here
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new KafkaProducer<>(props);
}
在编写器部分,我在executorservice.submit(…)中使用了线程计数50,我使用以下代码将消息发送到主题
public void send(String message){
LOG.info("sending message='{}' to topic='{}'", message, topic);
kafkaTemplate.send(topic, message).get();
}
我的问题是我得到了关于这个主题的重复日志和重复Kafka信息。
暂无答案!
目前还没有任何答案,快来回答吧!