无法在分布式模式下启动kafka connect进行ElasticSearch

mzaanser  于 2021-06-05  发布在  Kafka
关注(0)|答案(2)|浏览(549)

我试图启动Kafka连接在分布式模式,即使在独立也无法继续
这是我的ElasticSearchFlume属性

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=5
topics=fsp-audit
key.ignore=true
connection.url=https://****.amazonaws.com
type.name=kafka-connect
errors.tolerance = all
errors.deadletterqueue.topic.name = fsp-dlq-audit-event

这是我的connect-distributed.properties

bootstrap.servers=***:9092,***:9092,***:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
schema.enabled=false
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000
plugin.path=/usr/local/confluent/share/java

我还创建了三个topi的前期工作

connect-offsets
connect-configs
connect-status

我在ec2上运行这个,并使用msk作为kafka。我检查了从ec2到msk的连接,我可以远程发送
我得到这个错误

[2020-01-30 08:53:12,126] INFO [AdminClient clientId=adminclient-1] Metadata update failed (org.apache.kafka.clients.admin.internals.AdminMetadataManager:237)
org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call.
[2020-01-30 08:53:12,145] INFO [AdminClient clientId=adminclient-1] Metadata update failed (org.apache.kafka.clients.admin.internals.AdminMetadataManager:237)
org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call.
[2020-01-30 08:53:12,149] ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectDistributed:83)
org.apache.kafka.connect.errors.ConnectException: Failed to connect to and describe Kafka cluster. Check worker's broker connection and security properties.
        at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64)
        at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:45)
        at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:94)
        at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:77)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
        at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
        at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
        at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
        at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:58)

问:如果我必须在分布式模式下运行kafka connect,我是否必须使用多个ec2/vm?

jum4pzuy

jum4pzuy1#

好的,在查看更多细节之后,我发现问题出在nacl中,它阻塞了几个子网的ip地址。
所以,我检查了msk-side-security-group/network-acl/route表,发现它们没有问题。这意味着问题可能与ec2示例有关,因此我检查了该示例的安全组/路由表,发现它们配置正确。
但是,在检查网络acl时 (acl-***) 在附加了ec2示例之后,我看到有一个入站规则,允许临时端口使用0.0.0.0/0,这应该允许代理与ec2示例对话。然而,在查看出站规则时,我看到它只允许存在b-2的子网范围,但它没有任何显式的出站规则来允许b-3 (10.**.**.0/24) 或b-4 (10.**.**.0/24) 子网范围。
当我添加新规则时,我就能够ping并成功地连接

6uxekuva

6uxekuva2#

如果我必须在分布式模式下运行kafka connect,我是否必须使用多个ec2/vm?
不,您可以运行一个“psuedo分布式”示例。独立和分布式的主要区别在于它如何处理偏移量和配置的存储。

相关问题