你好,我有以下配置来使用Spring Batch应用程序生成主题:
Map<String,Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,JsonSerializer.class);
configProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG,10000);
configProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,6000);
configProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG,8000);
configProps.put(ProducerConfig.RETRIES_CONFIG,0);
configProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
configProps.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
configProps.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
"%s required username=\"%s\" " + "password=\"%s\";", PlainLoginModule.class.getName(), "UYDUEWWU27LFC2CJ", "yqBcf5BM4rd3X273OsaB6/n7/kuxmRG39+Fr3nVuOdXI/RZ/sM9E6hpqmaAPxCbC"
));
这工作正常,但是当我尝试在SASL JAAS CONFIG中获取用户名和密码的env变量时,就像我在引导服务器中所做的那样:
String username = env.getProperty("spring.bootstrap.kafka.username");
String password = env.getProperty("spring.bootstrap.kafka.password");
configProps.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
"%s required username=\"%s\" " + "password=\"%s\";", PlainLoginModule.class.getName(), username, password
));
当发送对象到Kafka主题时,它给予我的错误是身份验证失败,即使它说登录成功。我已经验证了每个字符串格式的内容与log相同。这怎么可能?
谢谢
1条答案
按热度按时间yv5phkfx1#
我改了这一行:
configProps.put(SaslConfigs.SASL_JAAS_CONFIG, String.format( "%s required username=\"%s\" " + "password=\"%s\";", PlainLoginModule.class.getName(), username, password ));
对于这个:
configProps.put("sasl.jaas.config",String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",username, password));