ConfluentReplicator连接器失败,因为无法从主题确定主体

mmvthczy  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(1015)

我在尝试从一个融合的云集群复制到另一个集群时遇到了一个问题。我一直在遵循关于如何做到这一点的合流文档,但是我遇到了一致的失败,我必须假设是由于配置中的错误造成的。
配置如下:

{
  "name": "kafka_topics_replication",
  "config": {
    "name": "kafka_topics_replication",
    "connector.class": "io.confluent.connect.replicator.ReplicatorSourceConnector",
    "topic.whitelist": "topics",
    "src.kafka.bootstrap.servers": "source-broker:9092",
    "src.kafka.security.protocol": "SASL_SSL",
    "src.kafka.security.mechanism": "PLAIN",
    "src.kafka.client.id": "src-to-dst-replicator",
    "src.kafka.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"src-username\" password=\"src-password\" serviceName=\"Kafka\";",
    "confluent.topic.replication.factor": "3",
    "dest.topic.replication.factor": "3",
    "dest.kafka.bootstrap.servers": "dest-broker.cloud:9092",
    "dest.kafka.security.protocol": "SASL_SSL",
    "dest.kafka.sasl.mechanism": "PLAIN",
    "dest.kafka.client.id": "src-to-dst-replicator",
    "dest.kafka.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"dst-username\" password=\"dst-password\" serviceName=\"Kafka\";"
  },
  "tasks": [],
  "type": "source"
}

连接器将启动,但会继续记录以下错误:

[2020-07-14 14:45:15,568] WARN [kafka_topics_replication|worker] [AdminClient clientId=src-to-dst-replicator] Error connecting to node source-broker:9092 (id: -1 rack: null) (org.apache.kafka.clients.NetworkClient:969)
java.io.IOException: Channel could not be created for socket java.nio.channels.SocketChannel[closed]
    at org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:348)
    at org.apache.kafka.common.network.Selector.registerChannel(Selector.java:329)
    at org.apache.kafka.common.network.Selector.connect(Selector.java:256)
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:964)
    at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:294)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.sendEligibleCalls(KafkaAdminClient.java:1018)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1260)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1203)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
    at org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:228)
    at org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:338)
    ... 8 more
Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
Caused by: org.apache.kafka.common.KafkaException: Principal could not be determined from Subject, this may be a transient failure due to Kerberos re-login
    at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.firstPrincipal(SaslClientAuthenticator.java:616)
    at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.<init>(SaslClientAuthenticator.java:200)
    at org.apache.kafka.common.network.SaslChannelBuilder.buildClientAuthenticator(SaslChannelBuilder.java:274)
    at org.apache.kafka.common.network.SaslChannelBuilder.lambda$buildChannel$1(SaslChannelBuilder.java:216)
    at org.apache.kafka.common.network.KafkaChannel.<init>(KafkaChannel.java:142)
    at org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:224)
    at org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:338)
    at org.apache.kafka.common.network.Selector.registerChannel(Selector.java:329)
    at org.apache.kafka.common.network.Selector.connect(Selector.java:256)
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:964)
    at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:294)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.sendEligibleCalls(KafkaAdminClient.java:1018)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1260)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1203)
    at java.lang.Thread.run(Thread.java:748)

谷歌搜索并没有发现任何适用于这种情况的东西。祈祷这里的人至少能给我指出正确的方向。
谢谢,-赖安

mlmc2os5

mlmc2os51#

连接器配置似乎缺少某些属性,例如:
src.kafka.ssl.endpoint.identification.algorithm=https dest.kafka.ssl.endpoint.identification.algorithm=https src.kafka.request.timeout.ms=20000 dest.kafka.request.timeout.ms=20000 src.kafka.retry.backoff.ms=500 dest.kafka.retry.backoff.ms=500 另外,属性 src.kafka.sasl.jaas.config 以及 dest.kafka.sasl.jaas.config 似乎对值使用了错误的格式。
而不是: org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<CLUSTER_API_KEY>\" password=\"<CLUSTER_API_SECRET>\" serviceName=\"Kafka\"; 用途: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<CLUSTER_API_KEY>\" password=\"<CLUSTER_API_SECRET>\";" 您可以在此处找到有关此配置的更多信息。

相关问题