我正在使用github.com/shopify/sarama包与kafka进行交互。在我目前的方法中,我可以连接到代理并毫无问题地获取所有主题名称(下面是消费者代码)。
但是,当我尝试使用管理客户端(下面的管理代码)删除某些主题时,我收到“dial tcp:lookup ip-x-x-xx.ec2.internal:no-such host”错误。
我不明白为什么我会收到这个错误。我将非常感谢任何提示或可能的解决办法。
消费者
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
//get broker
cluster, err := sarama.NewConsumer("localhost:9092", config)
if err != nil {
panic(err)
}
defer func() {
if err := cluster.Close(); err != nil {
panic(err)
}
}()
//get all topic from cluster
topics, _ := cluster.Topics()
管理员
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Version = sarama.V2_4_0_0
//admin broker
admin, err := sarama.NewClusterAdmin("localhost:9092", config)
if err != nil {
panic(err)
}
defer func() {
if err := admin.Close(); err != nil {
panic(err)
}
}()
topic := []string{"test-topic"}
output := admin.DeleteTopic(topic)
if output == nil {
fmt.Printf(" delete - %s\n", topic[0])
} else {
fmt.Println(output)
}
注意,我正在通过转发ssh端口通过bastion示例连接到远程服务器。
更新
凝固后 sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
我得到以下信息:
[sarama] 2020/03/25 02:08:03 Initializing new client
[sarama] 2020/03/25 02:08:03 client/metadata fetching metadata for all topics from broker localhost:9092
[sarama] 2020/03/25 02:08:04 Connected to broker at localhost:9092 (unregistered)
[sarama] 2020/03/25 02:08:04 client/brokers registered new broker #1001 at ip-x-x-x-1.ec2.internal:9092
[sarama] 2020/03/25 02:08:04 client/brokers registered new broker #1003 at ip-x-x-x-2.ec2.internal:9092
[sarama] 2020/03/25 02:08:04 client/brokers registered new broker #1002 at ip-x-x-x-3.ec2.internal:9092
[sarama] 2020/03/25 02:08:04 Successfully initialized new client
[sarama] 2020/03/25 02:08:04 Failed to connect to broker ip-x-x-x-3.ec2.internal:9092: dial tcp: lookup ip-x-x-x-3.ec2.internal: no such host`
更新2
我的kafka server.properties:
advertised.listeners=INTERNAL://ip-x-x-x-1.ec2.internal:9091,EXTERNAL_INSECURE://ip-x-x-x-1.ec2.internal:9092
listeners=INTERNAL://:9091,EXTERNAL_INSECURE://:9092
listener.security.protocol.map=INTERNAL:SSL,EXTERNAL_INSECURE:PLAINTEXT
2条答案
按热度按时间fwzugrvs1#
当客户机连接到代理时(在您的情况下
localhost:9092
)代理提供集群中所有其他代理的客户机详细信息。您可以在日志中看到:初始连接
经纪人详情:
您的问题是,您的客户机随后将使用这些代理详细信息与集群进行进一步的通信。代理提供的这些地址称为播发侦听器。也就是说,经纪人“宣传”的听众。
这意味着您的客户机必须能够解析并连接到代理在初始连接时返回的侦听器的主机和端口。
为什么要入侵你的电脑
/etc/hosts
有效的方法是,您的本地客户端可以将这些地址解析回localhost,然后ssh转发工作。但这只是一个黑客。你应该设置你的
advertised.listeners
在代理配置中,指向客户端可以解析的地址(不需要任何客户端)/etc/hosts
黑客)。有关详细信息,请参阅https://rmoff.net/2018/08/02/kafka-listeners-explained/
编辑:为了清楚起见,您应该设置
advertised.listeners
在每个代理上,将其发送到客户端可以解析的地址—因此,如果是通过localhost ssh转发,请设置advertised.listeners
至localhost:9092
.33qvvth12#
所以将代理地址添加到已知主机
/etc/hosts
在我的本地机器上执行代码就完成了。/etc/主机:
尽管如此,我还是不明白为什么我在使用时不需要做这一步
sarama.NewConsumer()
.