无法在分布式模式下启动kafka connect进行ElasticSearch

mzaanser  于 2021-06-05  发布在  Kafka
关注(0)|答案(2)|浏览(572)

我试图启动Kafka连接在分布式模式,即使在独立也无法继续
这是我的ElasticSearchFlume属性

  1. name=elasticsearch-sink
  2. connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
  3. tasks.max=5
  4. topics=fsp-audit
  5. key.ignore=true
  6. connection.url=https://****.amazonaws.com
  7. type.name=kafka-connect
  8. errors.tolerance = all
  9. errors.deadletterqueue.topic.name = fsp-dlq-audit-event

这是我的connect-distributed.properties

  1. bootstrap.servers=***:9092,***:9092,***:9092
  2. group.id=connect-cluster
  3. key.converter=org.apache.kafka.connect.json.JsonConverter
  4. value.converter=org.apache.kafka.connect.json.JsonConverter
  5. key.converter.schemas.enable=false
  6. value.converter.schemas.enable=false
  7. offset.storage.topic=connect-offsets
  8. offset.storage.replication.factor=1
  9. schema.enabled=false
  10. config.storage.topic=connect-configs
  11. config.storage.replication.factor=1
  12. status.storage.topic=connect-status
  13. status.storage.replication.factor=1
  14. offset.flush.interval.ms=10000
  15. plugin.path=/usr/local/confluent/share/java

我还创建了三个topi的前期工作

  1. connect-offsets
  2. connect-configs
  3. connect-status

我在ec2上运行这个,并使用msk作为kafka。我检查了从ec2到msk的连接,我可以远程发送
我得到这个错误

  1. [2020-01-30 08:53:12,126] INFO [AdminClient clientId=adminclient-1] Metadata update failed (org.apache.kafka.clients.admin.internals.AdminMetadataManager:237)
  2. org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call.
  3. [2020-01-30 08:53:12,145] INFO [AdminClient clientId=adminclient-1] Metadata update failed (org.apache.kafka.clients.admin.internals.AdminMetadataManager:237)
  4. org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call.
  5. [2020-01-30 08:53:12,149] ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectDistributed:83)
  6. org.apache.kafka.connect.errors.ConnectException: Failed to connect to and describe Kafka cluster. Check worker's broker connection and security properties.
  7. at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64)
  8. at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:45)
  9. at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:94)
  10. at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:77)
  11. Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
  12. at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
  13. at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
  14. at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
  15. at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
  16. at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:58)

问:如果我必须在分布式模式下运行kafka connect,我是否必须使用多个ec2/vm?

jum4pzuy

jum4pzuy1#

好的,在查看更多细节之后,我发现问题出在nacl中,它阻塞了几个子网的ip地址。
所以,我检查了msk-side-security-group/network-acl/route表,发现它们没有问题。这意味着问题可能与ec2示例有关,因此我检查了该示例的安全组/路由表,发现它们配置正确。
但是,在检查网络acl时 (acl-***) 在附加了ec2示例之后,我看到有一个入站规则,允许临时端口使用0.0.0.0/0,这应该允许代理与ec2示例对话。然而,在查看出站规则时,我看到它只允许存在b-2的子网范围,但它没有任何显式的出站规则来允许b-3 (10.**.**.0/24) 或b-4 (10.**.**.0/24) 子网范围。
当我添加新规则时,我就能够ping并成功地连接

6uxekuva

6uxekuva2#

如果我必须在分布式模式下运行kafka connect,我是否必须使用多个ec2/vm?
不,您可以运行一个“psuedo分布式”示例。独立和分布式的主要区别在于它如何处理偏移量和配置的存储。

相关问题