我正在编写一个java程序来与kafka代理进行通信。kafka代理(通过restapi)提供客户机证书、客户机密钥和ca证书。kafkaconsumer示例化成功,但在执行poll()方法时抛出org.apache.kafka.common.errors.sslauthenticationexception,该异常最终可追溯到“找不到请求目标的有效证书路径”
互联网搜索似乎表明java客户机必须使用jks,但有一些参考(包括本文中的so)似乎暗示了一种使用keytool和openssl让这样一个java客户机工作的方法。
我有一个python客户机,它与同一个代理一起工作。用于示例化使用者的属性:
... 剪。。。
cons_props = {
'bootstrap.servers': '%s' % self.kafka_host,
'group.id': 'mygroup', 'session.timeout.ms': 8000,
'default.topic.config': { 'auto.offset.reset': 'smallest' },
'ssl.certificate.location': self.kafka_client_cert,
'ssl.key.location': self.kafka_client_key,
'security.protocol': 'ssl',
'ssl.ca.location': self.kafka_root_cert }
... 剪。。。
利用上面的(工作)python属性以及我读到的关于kafka和java客户机的内容,我的属性是:
... 剪。。。
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.60.100:9093");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "mygroup");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put("security.protocol", "SSL");
consumerProps.put("ssl.certificate.location", getClientCertLocation());
consumerProps.put("ssl.key.location", getClientKeyLocation());
consumerProps.put("ssl.ca.location", getRootCertLocation());
workQueue = (KafkaConsumer<String, String>)
new KafkaConsumer<String, String>(consumerProps);
workQueue.subscribe(Arrays.asList("q_name");
ConsumerRecords<String, String> r;
try {
r = (ConsumerRecords<String, String>)kc.poll(10);
... 剪。。。
如果ssl.*属性是假的,我可能天真地认为javakafkaconsumer会抱怨。但是,subscribe和ctor都运行良好,但抛出了poll方法异常。
如上所述,我似乎必须使用jks,但是1)由于我几乎没有访问代理配置的权限,所以我不确定这是否可能,2)我不知道是否可以将密钥和证书转换为jks,并且,一旦完成,如果代理只“说”pem,它是否可以工作。
这是我向他提出的第一个问题。希望它不会太冗长,同时对其他人有一些适当和帮助(我肯定不是第一个尝试这个?)
暂无答案!
目前还没有任何答案,快来回答吧!