kafka producer在spring批处理中使用多线程编写器向主题发送重复消息

jhdbpxl9  于 2021-07-06  发布在  Java
关注(0)|答案(0)|浏览(192)

我将spring批处理与writer一起使用(如果条件失败,还具有重试逻辑,请尝试再次执行它)作为多线程,如果writer中的条件得到满足,我将尝试向主题发送kafka消息。我使用的是kafka模板,配置中示例化了bean。

@Bean
    public Producer<Long, String> producerConfigs() {
        Properties props =  new Properties();
        //bootstrap servers and protocol configs here        
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new KafkaProducer<>(props);
    }

在编写器部分,我在executorservice.submit(…)中使用了线程计数50,我使用以下代码将消息发送到主题

public void send(String message){
        LOG.info("sending message='{}' to topic='{}'", message, topic);
        kafkaTemplate.send(topic, message).get();
    }

我的问题是我得到了关于这个主题的重复日志和重复Kafka信息。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题