Kafka -无法找到有效的认证路径

gorkyyrv  于 2023-02-28  发布在  Apache
关注(0)|答案(2)|浏览(571)

我正在用一个 Docker 的环境为ApacheKafka。
我将整个环境配置为使用SSL,到目前为止,一切都很好......当我运行docker-compose时,一切都正常运行,没有错误。
问题是当我向connect容器发送POST以创建与MySQL的连接并在Kafka主题内进行复制时。
我收到了一条关于SSL的错误消息,但我不明白错误是什么,因为只有在为MySQL创建生产者配置时才会出现这种情况
多克PS-A

CONTAINER ID   IMAGE                                              COMMAND                  CREATED         STATUS                            PORTS                                        NAMES
f4585e3fc2d4   chethanuk/kafka-connect:5.3.1                      "/etc/confluent/dock…"   6 minutes ago   Up 6 minutes (health: starting)   0.0.0.0:8083->8083/tcp, 9092/tcp             connect
4288ea7d0a3c   confluentinc/cp-schema-registry                    "/etc/confluent/dock…"   6 minutes ago   Up 6 minutes                      8081/tcp, 0.0.0.0:8181->8181/tcp             schema-registry
7d8571d525ff   confluentinc/cp-kafka:latest                       "/etc/confluent/dock…"   6 minutes ago   Up 6 minutes                      0.0.0.0:9092->9092/tcp                       broker
fb77012c0b7d   confluentinc/cp-zookeeper:latest                   "/etc/confluent/dock…"   6 minutes ago   Up 6 minutes                      2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp   zookeeper
08ae4c80b339   mysql:8                                            "docker-entrypoint.s…"   2 days ago      Up 30 hours                                                                    mysql

.环境

SSL_SECRET=datahub
ZK_HOST=zookeeper
ZK_PORT=2181
BROKER_HOST=broker
BROKER_PORT=9092
SR_HOST=schema-registry
SR_PORT=8181
CON=connect
CON_PORT=8083
HOST=localhost

docker-compose.yml

---
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: ${ZK_HOST}
    hostname: ${ZK_HOST}
    ports:
      - "${ZK_PORT}:${ZK_PORT}"
    environment:
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: ${ZK_PORT}

  broker:
    image: confluentinc/cp-kafka:latest
    container_name: ${BROKER_HOST}
    hostname: ${BROKER_HOST}
    ports:
      - "${BROKER_PORT}:${BROKER_PORT}"
    depends_on:
      - ${ZK_HOST}
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: '${ZK_HOST}:${ZK_PORT}'
      KAFKA_ADVERTISED_LISTENERS: 'SSL://${BROKER_HOST}:${BROKER_PORT}'
      KAFKA_SSL_KEYSTORE_FILENAME: broker.keystore.jks
      KAFKA_SSL_KEYSTORE_CREDENTIALS: cert_creds
      KAFKA_SSL_KEY_CREDENTIALS: cert_creds
      KAFKA_SSL_TRUSTSTORE_FILENAME: broker.truststore.jks
      KAFKA_SSL_TRUSTSTORE_CREDENTIALS: cert_creds
      KAFKA_SSL_CLIENT_AUTH: 'none'
      KAFKA_SECURITY_PROTOCOL: SSL
      KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SSL
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - ./secrets:/etc/kafka/secrets

  schema-registry:
    image: confluentinc/cp-schema-registry
    container_name: ${SR_HOST}
    hostname: ${SR_HOST}
    depends_on:
      - ${ZK_HOST}
      - ${BROKER_HOST}
    ports:
      - "${SR_PORT}:${SR_PORT}"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: ${SR_HOST}
      SCHEMA_REGISTRY_LISTENERS: 'https://0.0.0.0:${SR_PORT}'
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: '${ZK_HOST}:${ZK_PORT}'
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'SSL://${BROKER_HOST}:${BROKER_PORT}'
      SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: SSL
      SCHEMA_REGISTRY_KAFKASTORE_SSL_KEYSTORE_LOCATION: /etc/schema-registry/secrets/schema-registry.keystore.jks
      SCHEMA_REGISTRY_SSL_KEYSTORE_LOCATION: /etc/schema-registry/secrets/schema-registry.keystore.jks
      SCHEMA_REGISTRY_KAFKASTORE_SSL_KEYSTORE_PASSWORD: ${SSL_SECRET}
      SCHEMA_REGISTRY_SSL_KEYSTORE_PASSWORD: ${SSL_SECRET}
      SCHEMA_REGISTRY_KAFKASTORE_SSL_KEY_PASSWORD: ${SSL_SECRET}
      SCHEMA_REGISTRY_SSL_KEY_PASSWORD: ${SSL_SECRET}
      SCHEMA_REGISTRY_KAFKASTORE_SSL_TRUSTSTORE_LOCATION: /etc/schema-registry/secrets/schema-registry.truststore.jks
      SCHEMA_REGISTRY_SSL_TRUSTSTORE_LOCATION: /etc/schema-registry/secrets/schema-registry.truststore.jks
      SCHEMA_REGISTRY_KAFKASTORE_SSL_TRUSTSTORE_PASSWORD: ${SSL_SECRET}
      SCHEMA_REGISTRY_SSL_TRUSTSTORE_PASSWORD: ${SSL_SECRET}
      SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: https
      SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas2
      SCHEMA_REGISTRY_SSL_CLIENT_AUTH: 'true'
    volumes:
      - ./secrets:/etc/schema-registry/secrets

  connect:
    user: '0'
    image: chethanuk/kafka-connect:5.3.1
    hostname: '${CON}'
    container_name: ${CON}
    depends_on:
      - ${ZK_HOST}
      - ${BROKER_HOST}
      - ${SR_HOST}
    ports:
      - "${CON_PORT}:${CON_PORT}"
    environment:
      CONNECT_LISTENERS: 'https://0.0.0.0:${CON_PORT}'
      CONNECT_REST_ACCESS_CONTROL_ALLOW_METHODS: 'GET,POST,PUT,DELETE,OPTIONS'
      CONNECT_REST_ACCESS_CONTROL_ALLOW_ORIGIN: '*'
      CONNECT_BOOTSTRAP_SERVERS: 'SSL://${BROKER_HOST}:${BROKER_PORT}'
      CONNECT_REST_ADVERTISED_HOST_NAME: ${CON}
      CONNECT_REST_PORT: ${CON_PORT}
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: https://${SR_HOST}:${SR_PORT}
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_ZOOKEEPER_CONNECT: '${ZK_HOST}:${ZK_PORT}'
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.2.1.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
      CONNECT_SSL_CLIENT_AUTH: 'true'
      CONNECT_SECURITY_PROTOCOL: SSL
      CONNECT_SSL_KEY_PASSWORD: ${SSL_SECRET}
      CONNECT_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/connect.truststore.jks
      CONNECT_SSL_TRUSTSTORE_PASSWORD: ${SSL_SECRET}
      CONNECT_SSL_KEYSTORE_LOCATION: /etc/kafka/secrets/connect.keystore.jks
      CONNECT_SSL_KEYSTORE_PASSWORD: ${SSL_SECRET}
      CONNECT_PRODUCER_SECURITY_PROTOCOL: SSL
      CONNECT_PRODUCER_BOOTSTRAP_SERVERS: 'SSL://${BROKER_HOST}:${BROKER_PORT}'
      CONNECT_PRODUCER_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/producer.truststore.jks
      CONNECT_PRODUCER_SSL_TRUSTSTORE_PASSWORD: ${SSL_SECRET}
      CONNECT_CONSUMER_SECURITY_PROTOCOL: SSL
      CONNECT_CONSUMER_BOOTSTRAP_SERVERS: 'SSL://${BROKER_HOST}:${BROKER_PORT}'
      CONNECT_CONSUMER_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/consumer.truststore.jks
      CONNECT_CONSUMER_SSL_TRUSTSTORE_PASSWORD: ${SSL_SECRET}
    volumes:
      - ./secrets:/etc/kafka/secrets

create-cert.sh

#!/bin/bash

set -o nounset \
    -o errexit

printf "Deleting previous (if any)..."
rm -rf secrets
mkdir secrets
mkdir -p tmp
echo " OK!"
# Generate CA key
printf "Creating CA..."
openssl req -new -x509 -keyout tmp/datahub-ca.key -out tmp/datahub-ca.crt -days 365 -subj '/CN=ca.datahub/OU=test/O=datahub/L=paris/C=fr' -passin pass:datahub -passout pass:datahub >/dev/null 2>&1

echo " OK!"

for i in 'broker' 'producer' 'consumer' 'schema-registry' 'connect'
do
    printf "Creating cert and keystore of $i..."
    # Create keystores
    keytool -genkey -noprompt \
                 -alias $i \
                 -dname "CN=$i, OU=test, O=datahub, L=paris, C=fr" \
                 -keystore secrets/$i.keystore.jks \
                 -keyalg RSA \
                 -storepass datahub \
                 -keypass datahub  >/dev/null 2>&1

    # Create CSR, sign the key and import back into keystore
    keytool -keystore secrets/$i.keystore.jks -alias $i -certreq -file tmp/$i.csr -storepass datahub -keypass datahub >/dev/null 2>&1

    openssl x509 -req -CA tmp/datahub-ca.crt -CAkey tmp/datahub-ca.key -in tmp/$i.csr -out tmp/$i-ca-signed.crt -days 365 -CAcreateserial -passin pass:datahub  >/dev/null 2>&1

    keytool -keystore secrets/$i.keystore.jks -alias CARoot -import -noprompt -file tmp/datahub-ca.crt -storepass datahub -keypass datahub >/dev/null 2>&1

    keytool -keystore secrets/$i.keystore.jks -alias $i -import -file tmp/$i-ca-signed.crt -storepass datahub -keypass datahub >/dev/null 2>&1

    # Create truststore and import the CA cert.
    keytool -keystore secrets/$i.truststore.jks -alias CARoot -import -noprompt -file tmp/datahub-ca.crt -storepass datahub -keypass datahub >/dev/null 2>&1
  echo " OK!"
done

echo "datahub" > secrets/cert_creds
rm -rf tmp

echo "SUCCEEDED"

Postman :https://本地主机:8083/连接器

{
   "name":"MySQL-Demo,
   "config":{
      "connector.class":"io.debezium.connector.mysql.MySqlConnector",
      "database.hostname":"<ip>",
      "database.port":"3306",
      "database.user":"root",
      "database.password":"mysql",
      "database.server.id":"1",
      "database.server.name":"mysql",
      "database.history.kafka.bootstrap.servers":"broker:9092",
      "database.history.kafka.topic":"dbhistory.demo",
      "database.history.producer.security.protocol":"SSL",
      "database.history.producer.ssl.keystore.location":"/etc/kafka/secrets/connect.keystore.jks",
      "database.history.producer.ssl.keystore.password":"datahub",
      "database.history.producer.ssl.truststore.location":"/etc/kafka/secrets/connect.truststore.jks",
      "database.history.producer.ssl.truststore.password":"datahub",
      "database.history.producer.ssl.key.password":"datahub",
      "database.history.consumer.security.protocol":"SSL",
      "database.history.consumer.ssl.keystore.location":"/etc/kafka/secrets/connect.keystore.jks",
      "database.history.consumer.ssl.keystore.password":"datahub",
      "database.history.consumer.ssl.truststore.location":"/etc/kafka/secrets/connect.truststore.jks",
      "database.history.consumer.ssl.truststore.password":"datahub",
      "database.history.consumer.ssl.key.password":"datahub",
      "include.schema.changes":"true",
      "table.whitelist":"demo.test"
   }
}

日志连接

[2021-05-22 18:58:27,342] INFO [Producer clientId=connector-producer-MySQL-Demo-0] Failed authentication with broker/192.168.16.3 (SSL handshake failed) (org.apache.kafka.common.network.Selector)
[2021-05-22 18:58:27,343] ERROR [Producer clientId=connector-producer-MySQL-Demo-0] Connection to node -1 (broker/192.168.16.3:9092) failed authentication due to: SSL handshake failed (org.apache.kafka.clients.NetworkClient)
[2021-05-22 18:58:27,674] INFO Reading structure of database 'demo' (io.debezium.connector.mysql.MySqlSnapshotChangeEventSource)
[2021-05-22 18:58:28,374] INFO [Producer clientId=connector-producer-MySQL-Demo-0] Failed authentication with broker/192.168.16.3 (SSL handshake failed) (org.apache.kafka.common.network.Selector)
[2021-05-22 18:58:28,375] ERROR [Producer clientId=connector-producer-MySQL-Demo-0] Connection to node -1 (broker/192.168.16.3:9092) failed authentication due to: SSL handshake failed (org.apache.kafka.clients.NetworkClient)
[2021-05-22 18:58:28,851] INFO Snapshot step 6 - Persisting schema history (io.debezium.relational.RelationalSnapshotChangeEventSource)
[2021-05-22 18:58:29,605] INFO [Producer clientId=connector-producer-MySQL-Demo-0] Failed authentication with broker/192.168.16.3 (SSL handshake failed) (org.apache.kafka.common.network.Selector)
[2021-05-22 18:58:29,606] ERROR [Producer clientId=connector-producer-MySQL-Demo-0] Connection to node -1 (broker/192.168.16.3:9092) failed authentication due to: SSL handshake failed (org.apache.kafka.clients.NetworkClient)
[2021-05-22 18:58:29,788] ERROR Failed to send HTTP request to endpoint: https://schema-registry:8181/subjects/mysql-value/versions (io.confluent.kafka.schemaregistry.client.rest.RestService)
javax.net.ssl.SSLHandshakeException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
    at sun.security.ssl.Alert.createSSLException(Alert.java:131)
    at sun.security.ssl.TransportContext.fatal(TransportContext.java:324)
    at sun.security.ssl.TransportContext.fatal(TransportContext.java:267)
    at sun.security.ssl.TransportContext.fatal(TransportContext.java:262)
    at sun.security.ssl.CertificateMessage$T12CertificateConsumer.checkServerCerts(CertificateMessage.java:645)
    at sun.security.ssl.CertificateMessage$T12CertificateConsumer.onCertificate(CertificateMessage.java:464)
    at sun.security.ssl.CertificateMessage$T12CertificateConsumer.consume(CertificateMessage.java:360)
    at sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:377)
    at sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:444)
    at sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:422)
    at sun.security.ssl.TransportContext.dispatch(TransportContext.java:182)
    at sun.security.ssl.SSLTransport.decode(SSLTransport.java:156)
    at sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1197)
    at sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1106)
    at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:398)
    at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:370)
    at sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:559)
    at sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:185)
    at sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1340)
    at sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1315)
    at sun.net.www.protocol.https.HttpsURLConnectionImpl.getOutputStream(HttpsURLConnectionImpl.java:264)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:241)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:322)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:422)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:414)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:400)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:140)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:196)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:172)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:71)
    at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:131)
    at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:80)
    at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:62)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:290)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:290)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:316)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:240)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
    at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:456)
    at sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:323)
    at sun.security.validator.Validator.validate(Validator.java:271)
    at sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:315)
    at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:223)
    at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:129)
    at sun.security.ssl.CertificateMessage$T12CertificateConsumer.checkServerCerts(CertificateMessage.java:629)
    ... 42 more
Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
    at sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:141)
    at sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:126)
    at java.security.cert.CertPathBuilder.build(CertPathBuilder.java:280)
    at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:451)
    ... 48 more
[2021-05-22 18:58:29,799] INFO WorkerSourceTask{id=MySQL-Demo-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2021-05-22 18:58:29,799] INFO WorkerSourceTask{id=MySQL-Demo-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2021-05-22 18:58:29,800] ERROR WorkerSourceTask{id=MySQL-Demo-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:290)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:316)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:240)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic mysql :
    at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:83)
    at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:62)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:290)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    ... 11 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: javax.net.ssl.SSLHandshakeException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
    at sun.security.ssl.Alert.createSSLException(Alert.java:131)
    at sun.security.ssl.TransportContext.fatal(TransportContext.java:324)
    at sun.security.ssl.TransportContext.fatal(TransportContext.java:267)
    at sun.security.ssl.TransportContext.fatal(TransportContext.java:262)
    at sun.security.ssl.CertificateMessage$T12CertificateConsumer.checkServerCerts(CertificateMessage.java:645)
    at sun.security.ssl.CertificateMessage$T12CertificateConsumer.onCertificate(CertificateMessage.java:464)
    at sun.security.ssl.CertificateMessage$T12CertificateConsumer.consume(CertificateMessage.java:360)
    at sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:377)
    at sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:444)
    at sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:422)
    at sun.security.ssl.TransportContext.dispatch(TransportContext.java:182)
    at sun.security.ssl.SSLTransport.decode(SSLTransport.java:156)
    at sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1197)
    at sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1106)
    at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:398)
    at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:370)
    at sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:559)
    at sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:185)
    at sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1340)
    at sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1315)
    at sun.net.www.protocol.https.HttpsURLConnectionImpl.getOutputStream(HttpsURLConnectionImpl.java:264)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:241)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:322)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:422)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:414)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:400)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:140)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:196)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:172)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:71)
    at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:131)
    at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:80)
    at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:62)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:290)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:290)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:316)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:240)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
    at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:456)
    at sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:323)
    at sun.security.validator.Validator.validate(Validator.java:271)
    at sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:315)
    at sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:223)
    at sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:129)
    at sun.security.ssl.CertificateMessage$T12CertificateConsumer.checkServerCerts(CertificateMessage.java:629)
    ... 42 more
Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
    at sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:141)
    at sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:126)
    at java.security.cert.CertPathBuilder.build(CertPathBuilder.java:280)
    at sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:451)
    ... 48 more
[2021-05-22 18:58:29,805] ERROR WorkerSourceTask{id=MySQL-Demo-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
n3h0vuf2

n3h0vuf21#

AvroConverter需要更多配置才能使用https
请注意,在安全环境中使用Avro时,需要添加*.converter.schema.registry.ssl.属性
例如

key.converter.schema.registry.ssl.truststore.location=<location>
key.converter.schema.registry.ssl.truststore.password=<trustore-password>
key.converter.schema.registry.ssl.keystore.location=<keystore-location>
key.converter.schema.registry.ssl.keystore.password=<keystore-password>
key.converter.schema.registry.ssl.key.password=<key-password>

value.converter.schema.registry.ssl.truststore.location=<location>
value.converter.schema.registry.ssl.truststore.password=<trustore-password>
value.converter.schema.registry.ssl.keystore.location=<keystore-location>
value.converter.schema.registry.ssl.keystore.password=<keystore-password>
value.converter.schema.registry.ssl.key.password=<key-password>

https://docs.confluent.io/platform/current/schema-registry/connect.html#avro
您可能还需要考虑添加CONNECT_ADMIN_变量来设置AdminClient SSL属性

ffscu2ro

ffscu2ro2#

在我们使用Kafka版本3.2.1和confluent registry 7.2.1的情况下,生产者和消费者都是基于java的,并且解决了在生产者和消费者属性中添加/设置属性以开始的错误:
配置属性.put(架构注册表客户端配置.客户端名称空间+SSL配置.SSL信任存储位置配置,SSL信任存储位置);配置属性.put(架构注册表客户端配置.客户端名称空间+SSL配置.SSL信任存储密码配置,SSL信任存储密码);
此类已拥有命名空间架构注册表客户端配置。CLIENT_NAMESPACE =架构.注册表。
这解决了问题。

相关问题