kafka如何配置两个不同kerberos的jaas

yzuktlbb  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(480)

我有一个springboot应用程序,它通过rest接收数据,基于一些业务逻辑,我需要将数据转发到两个不同的kafka集群,它们有自己的kerberos密钥和jaas文件。
我在两个不同的对象示例中编写了两个不同的producer示例,它们具有以下属性。

@Service
public class EventProducer {
    private Logger logger = LoggerFactory.getLogger(EventProducer.class);

    Producer<String, String> kafkaProducer = null;

    @Autowired
    public Producer<String, String> createProducer() {
        if (kafkaProducer == null) {
            Properties props = getKafkaConfig();
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "Cluste_1_hostaddress:9092");
         props.put(ProducerConfig.CLIENT_ID_CONFIG,"usertest");

         props.put(ProducerConfig.ACKS_CONFIG, "all");
         props.put(ProducerConfig.RETRIES_CONFIG, "3");

         props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
         props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1600);

         System.setProperty("javax.security.auth.useSubjectCredsOnly", "true"); 
         System.setProperty("java.security.auth.login.config", "/home/user/clusrter_1_jaas.conf);   

         props.put("security.protocol", "SASL_PLAINTEXT");
         props.put("kafka.cluster.SecurityProtocol",PLAINTEXTSASL);
         props.put("sasl.kerberos.service.name",  "kafka"); 
         props.put("sasl.kerberos", "sasl.kerberos.service.namekafka");     
         props.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
         props.put("sasl.mechanism.inter.broker.protocol", "PLAIN");
         props.put("sasl.enabled.mechanisms", "PLAIN");
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

         kafkaProducer = new KafkaProducer<String, String>(props);
        }
        return kafkaProducer;
    }

}

第二生产商

@Service
public class MovementProducer {
    private Logger logger = LoggerFactory.getLogger(MovementProducer.class);

    Producer<String, String> kafkaProducer = null;

    @Autowired
    public Producer<String, String> createProducer() {
        if (kafkaProducer == null) {
            Properties props = getKafkaConfig();
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "Cluste_2_hostaddress:9092");
         props.put(ProducerConfig.CLIENT_ID_CONFIG,"usertest");

         props.put(ProducerConfig.ACKS_CONFIG, "all");
         props.put(ProducerConfig.RETRIES_CONFIG, "3");

         props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
         props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1600);

         System.setProperty("javax.security.auth.useSubjectCredsOnly", "true"); 
         System.setProperty("java.security.auth.login.config", "/home/user/clusrter_2_jaas.conf);   

         props.put("security.protocol", "SASL_PLAINTEXT");
         props.put("kafka.cluster.SecurityProtocol",PLAINTEXTSASL);
         props.put("sasl.kerberos.service.name",  "kafka"); 
         props.put("sasl.kerberos", "sasl.kerberos.service.namekafka");     
         props.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
         props.put("sasl.mechanism.inter.broker.protocol", "PLAIN");
         props.put("sasl.enabled.mechanisms", "PLAIN");
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

            kafkaProducer = new KafkaProducer<String, String>(props);
        }
        return kafkaProducer;
    }

}

当我以两个服务的形式启动它时,只启用生产者示例,但当我在一个jar中同时启用两个示例时,只有一个生产者工作,而另一个生产者会遇到身份验证问题。
我觉得这是由于system.setproperty(“java.security.auth.login.config”,“”),因为它是全局系统变量,所以当我在单个进程中同时使用这两个变量时,它会覆盖,所以只有一个可以工作。
那么除了启动两个进程之外,还有什么办法解决这个问题呢。我只有一个 Spring 服务,应该能够产生两个不同的Kafka集群。。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题