spring Sping Boot 中的多个Kafka生产者

xpcnnkqh  于 2024-01-05  发布在  Spring
关注(0)|答案(1)|浏览(163)

我有一个用例,它需要我有多个Kafka Producer(基于配置)。也就是说,如果我的配置有3个想要接收数据的租户,我想启动3个生产者(所有三个写入3个不同的集群)。
我尝试将我的Kafka配置设置为:

  1. @Bean
  2. @Primary
  3. public Map<String, KafkaTemplate<String, String>> kafkaTemplates() {
  4. log.info("setting up kafka templates");
  5. final String ids = configuration.getIds();
  6. Map<String, KafkaTemplate<String, String>> kafkaTemplates = new HashMap<>();
  7. for (String id : ids.split(",")) {
  8. kafkaTemplates.put(id, kafkaTemplate(id));
  9. }
  10. return kafkaTemplates;
  11. }
  12. private KafkaTemplate<String, String> kafkaTemplate(String id) {
  13. log.info("setting up kafka template");
  14. try {
  15. Properties producerProperties = new Properties();
  16. try (InputStream kins = Files.newInputStream(new File("/opt/user-secrets/", id + ".properties").toPath())) {
  17. producerProperties.load(kins);
  18. } catch (IOException e) {
  19. throw new RuntimeException(e);
  20. }
  21. Map props = producerProperties;
  22. Map<String, Object> props1 = (Map<String, Object>) props;
  23. ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(props1);
  24. return new KafkaTemplate<>(producerFactory);
  25. } catch (Exception e) {
  26. log.error("e {}", e.getMessage(), e);
  27. throw new RuntimeException(e);
  28. }
  29. }

字符串
当我尝试在Producer服务中访问KafkaTemplate时,这导致了kafkaTemplates.get(id)上的NPE:

  1. @Service
  2. @Slf4j
  3. @Getter
  4. @Setter
  5. public class KafkaProducerService {
  6. private final Map<String, KafkaTemplate<String, String>> kafkaTemplates;
  7. private Schema avroSchema;
  8. public KafkaProducerService(Map<String, KafkaTemplate<String, String>> kafkaTemplates) {
  9. this.kafkaTemplates = kafkaTemplates;
  10. }
  11. public void produce(String id, String topic, String key, VehicleHeartbeat message) {
  12. GenericRecord record = generateAvroRecord(message);
  13. log.info("generic record: {}", record);
  14. kafkaTemplates.get(id).send(topic, key, record.toString());
  15. }


我该如何处理Spring的Kafka?

vulvrdjw

vulvrdjw1#

可能与其他Map bean冲突。
你可以尝试在kafkaTemplates方法上替换@Bean,如下所示:

  1. @Bean("myKafkaTemplatesMap")
  2. @Primary
  3. public Map<String, KafkaTemplate<String, String>> kafkaTemplates() {
  4. ...
  5. }

字符串
然后在服务构造函数中,你用相同的名字来限定它,这样你就可以确保注入正确的bean:

  1. @Service
  2. public class KafkaProducerService {
  3. public KafkaProducerService(@Qualifier("myKafkaTemplatesMap") Map<String, KafkaTemplate<String, String>> kafkaTemplates) {
  4. this.kafkaTemplates = kafkaTemplates;
  5. }
  6. }

展开查看全部

相关问题