萨拉玛不能和Kafka服务器通话

mefy6pfw  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(420)

所以我试图配置一个sarama(kafka的本地go客户机)producer客户机。我已经相应地配置了tls,确保客户端证书是使用正确的密码生成的。我初始化客户端的go代码如下所示:

import (
    "crypto/tls"
    "crypto/x509"
    "encoding/pem"
    "io/ioutil"
    "net"
    "path/filepath"

    "github.com/Shopify/sarama"
    log "github.com/sirupsen/logrus"
)

const (
    certFile = "client_ingestion_client.pem"
    keyFile  = "client_ingestion_client.key"
)

func InitKafkaClient(host string, port string, certPath string) (sarama.AsyncProducer, error) {

    cf := filepath.Join(certPath, certFile)
    kf := filepath.Join(certPath, keyFile)

    // Log cert and key path
    log.Debugln(cf)
    log.Debugln(kf)

    // Read the cert in
    certIn, err := ioutil.ReadFile(cf)

    if err != nil {
        log.Error("cannot read cert", err)
        return nil, err
    }

    // Read & decode the encrypted key file with the pass to make tls work
    keyIn, err := ioutil.ReadFile(kf)
    if err != nil {
        log.Error("cannot read key", err)
        return nil, err
    }

    // Decode and decrypt our PEM block as DER
    decodedPEM, _ := pem.Decode([]byte(keyIn))
    decrypedPemBlock, err := x509.DecryptPEMBlock(decodedPEM, []byte("m4d3ups3curity4k4fka?"))
    if err != nil {
        log.Error("cannot decrypt pem block", err)
        return nil, err
    }

    // Parse the DER encoded block as PEM
    rsaKey, err := x509.ParsePKCS1ParrivateKey(decrypedPemBlock)
    if err != nil {
       log.Error("failed to parse rsa as pem", err)
       return nil, err
    }

    // Marshal the pem encoded RSA key to bytes in memory
    pemdata := pem.EncodeToMemory(
       &pem.Block{
            Type:  "RSA PRIVATE KEY",
            Bytes: x509.MarshalPKCS1PrivateKey(rsaKey),
        },
    )
    if err != nil {
        log.Error("cannot marshal rsa as pem in memory", err)
        return nil, err
    }

    // Load our decrypted key pair
    crt, err := tls.X509KeyPair(certIn, pemdata)
    if err != nil {
        log.Error("cannot load key pair", err)
        return nil, err
    }
    config := sarama.NewConfig()
    config.Net.TLS.Enable = true
    config.Net.TLS.Config = &tls.Config{
        Certificates: []tls.Certificate{crt},
        CipherSuites: []uint16{
           tls.TLS_RSA_WITH_AES_128_GCM_SHA256,
        },
    }

    // Setting this allows us not to read from successes channel
    config.Producer.Return.Successes = false
    // Setting this allows us not to read from errors channel
    config.Producer.Return.Errors = false
    client, err := sarama.NewClient([]string{net.JoinHostPort(host, port)}, config)
    if err != nil {
        return nil, err
    }
    return sarama.NewAsyncProducerFromClient(client)
}

当我初始化代码时,我得到一个错误: time="2018-01-19T15:31:38Z" level=error msg="Error trying to setup kafka: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)" 我已经验证了Kafka主机是可访问的,可以连接到。见下文。
我通过检查go代码的输出到 openssl rsa -in client_ingestion_client.key -out decrypted.key 命令。我还确保使用带有正确标志的keytool正确生成密钥,包括这里建议的-keylag rsa标志。
我也跑了 openssl s_client -connect $KAFKA_HOST:$KAFKA_PORT 得到以下回应

verify error:num=19:self signed certificate in certificate chain
139901934057376:error:1408E0F4:SSL routines:ssl3_get_message:unexpected message:s3_both.c:408:

验证错误是好的,因为我使用的是自签名证书,但我不知道下面的错误是什么。也许这就是我问题的原因?
此外,我得到以下信息:

Requested Signature Algorithms: ECDSA+SHA512:RSA+SHA512:ECDSA+SHA384:RSA+SHA384:ECDSA+SHA256:RSA+SHA256:DSA+SHA256:ECDSA+SHA224:RSA+SHA224:DSA+SHA224:ECDSA+SHA1:RSA+SHA1:DSA+SHA1
Shared Requested Signature Algorithms: ECDSA+SHA512:RSA+SHA512:ECDSA+SHA384:RSA+SHA384:ECDSA+SHA256:RSA+SHA256:DSA+SHA256:ECDSA+SHA224:RSA+SHA224:DSA+SHA224:ECDSA+SHA1:RSA+SHA1:DSA+SHA1
    Peer signing digest: SHA512
    Server Temp Key: ECDH, P-256, 256 bits
    ---
    SSL handshake has read 4668 bytes and written 169 bytes
    ---
    New, TLSv1/SSLv3, Cipher is ECDHE-RSA-AES128-GCM-SHA256
    Server public key is 2048 bit
    Secure Renegotiation IS supported
    Compression: NONE
    Expansion: NONE
    No ALPN negotiated
    SSL-Session:
        Protocol  : TLSv1.2
        Cipher    : ECDHE-RSA-AES128-GCM-SHA256
        Session-ID: 5A6216C765EF33BC85FACE82B01BC506358F4D62C77817A1F7EEFB50941DAEC9
        Session-ID-ctx:
        Master-Key: F8641FBF63A0AC7AB2D6D941C421DCA44550448524DADF4F0A7943F7928E65D5773E60A45212A7F320B250595AA6737B
        Key-Arg   : None
        Krb5 Principal: None
        PSK identity: None
        PSK identity hint: None
        Start Time: 1516377799
        Timeout   : 300 (sec)
        Verify return code: 19 (self signed certificate in certificate chain)
    ---

由于此密码在openssl协议中引用: ECDHE-RSA-AES128-GCM-SHA256 我试着加上这个 tls.TLS_RSA_WITH_AES_128_GCM_SHA256 我的围棋代码,这似乎是一个密切匹配,但我得到了同样的错误信息,在围棋与它说,它已经用完了可用的经纪人交谈。

bvk5enib

bvk5enib1#

所以我找到了我的问题。原来kafka部署的子域有一个自签名证书,所以我必须设置 InsecureSkipVerify: true 在客户端的config.net.tls.config结构中。所以代码看起来像:

config.Net.TLS.Config = &tls.Config{
    Certificates: []tls.Certificate{crt},
    InsecureSkipVerify: true,
}

也不需要包含密码套件。

相关问题