我对Kafka和Logstash还不熟悉。我想使用logstash作为生产者,并希望使用ssl。
这是我的日志
input {
http {
port => 5044
codec => json
}
}
output {
kafka {
#bootstrap_servers => ["localhost:9093"]
bootstrap_servers => ["kafka broker's IP:9093"]
topic_id => "test"
codec => "json"
ssl_truststore_location => "/etc/logstash/conf.d/test/kafka.client.truststore.jks"
ssl_truststore_password => "passwd"
security_protocol => "SSL"
#ssl_keystore_location => "/etc/logstash/conf.d/test/kafka.client.keystore.jks"
#ssl_keystore_password => "passwd"
#ssl_key_password => "passwd"
}
}
这是Kafka服务器的属性。
listeners=PLAINTEXT://:9092, SSL://kafka broker's ip:9093
advertised.listeners=PLAINTEXT://kafka broker's ip:9092,SSL://kafka broker's ip:9093
security.inter.broker.protocol=SSL
# ssl.client.auth=required
ssl.keystore.location=/etc/logstash/conf.d/test/kafka.server.keystore.jks
ssl.keystore.password=dlffpr
ssl.key.password=dlffpr
ssl.truststore.location=/etc/logstash/conf.d/test/kafka.server.truststore.jks
ssl.truststore.password=dlffpr
ssl.endpoint.identification.algorithm=
下面是我如何制作ssl密钥的。我参考了azure的指南。在制作kafka.server.keystore.jks时,我不知道用“cn:”写什么。没有域名,我只知道它的主机名(它的Kafka曼纳格)和ip地址。我试着加上-ext“san”=dns:hostname,ip:Kafka经纪人的ip“
# each broker
keytool -keystore kafka.server.keystore.jks -alias asd -genkey -keyalg RSA -validity 365 -storepass "dlffpr" -keypass "dlffpr" -dname "**CN=kafka broker's ip**" -storetype pkcs12
keytool -keystore kafka.server.keystore.jks -alias asd -certreq -file cert-file -storepass "dlffpr" -keypass "dlffpr"
# manager
# CA 인증서 및 키 파일 생성
openssl req -new -newkey rsa:2048 -days 365 -x509 -subj "/CN=asdasdasd" -keyout ca-key -out ca-cert -nodes
# broker
# scp root@kafka broker's ip:/etc/logstash/conf.d/test/ca-cert .
# scp root@kafka broker's ip:/etc/logstash/conf.d/test/ca-key .
# CA cert-signed
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:"dlffpr"
# KEY STORE, truststore add ca-cert
keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert -storepass "dlffpr" -keypass "dlffpr" -noprompt
keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert -storepass "dlffpr" -keypass "dlffpr" -noprompt
# keystore add cert signed
keytool -keystore kafka.server.keystore.jks -alias asd -import -file cert-signed -storepass "dlffpr" -keypass "dlffpr" -noprompt
# manager(client)
# client keystore
keytool -keystore kafka.client.keystore.jks -alias localhost -genkey -keyalg RSA -validity 365 -storepass "dlffpr" -keypass "dlffpr" -dname "CN=localhost" -storetype pkcs12
# client cert request
keytool -keystore kafka.client.keystore.jks -alias localhost -certreq -file client-cert-sign-request -storepass "dlffpr" -keypass "dlffpr"
# CA cert
openssl x509 -req -CA ca-cert -CAkey ca-key -in ./client-cert-sign-request -out client-cert-signed -days 365 -CAcreateserial -passin pass:dlffpr
# keystore, truststore
keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-cert -storepass "dlffpr" -keypass "dlffpr" -noprompt
keytool -keystore kafka.client.keystore.jks -alias CARoot -import -file ca-cert -storepass "dlffpr" -keypass "dlffpr" -noprompt
# keystore
keytool -keystore kafka.client.keystore.jks -alias localhost -import -file client-cert-signed -storepass "dlffpr" -keypass "dlffpr" -noprompt
当我开始罗格斯塔什和Kafka,我得到这样的错误。日志存储错误
[org.apache.kafka.common.network.Selector][main] [Producer clientId=producer-1] Failed authentication with /192.168.1.6 (SSL handshake failed)
[2020-06-26T16:56:59,131][ERROR][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Connection to node -1 (/kafka broker's ip:9093) failed authentication due to: SSL handshake failed
[2020-06-26T16:56:59,131][WARN ][org.apache.kafka.clients.NetworkClient][main] [Producer clientId=producer-1] Bootstrap broker kafka broker's ip:9093 (id: -1 rack: null) disconnected
Kafka错误
INFO [SocketServer brokerId=0] Failed authentication with /kafka client's ip (SSL handshake failed) (org.apache.kafka.common.network.Selector)
当我设置kafka的服务器属性,比如,我用“cn:localhost”创建了密钥,但是logstash和kafka不在同一台机器上时,它就工作了。请给我任何建议。谢谢。
listeners=PLAINTEXT://:9092, SSL://localhost:9093
1条答案
按热度按时间k2arahey1#
今天我们遇到了同样的问题,尝试使用sasl\u ssl和两个侦听器(每个侦听器具有不同的ip和dns名称)启动kafka,我们重现了您的问题。当我们尝试开始日志存储时,不要出现以下错误:
我们创建一个多域证书,然后设置选项:
在server.properties(kafka)中,重启服务后,logstash工作正常!