我已经开始学习Kafka了。试着对它进行基本操作。我在一个关于“经纪人”的问题上结巴了。
我的kafka正在运行,但是当我想创建一个分区时。
from kafka import TopicPartition
(ERROR THERE) consumer = KafkaConsumer(bootstrap_servers='localhost:1234')
consumer.assign([TopicPartition('foobar', 2)])
msg = next(consumer)
回溯(最后一次调用):文件“,”第1行,在文件“/usr/local/lib/python2.7/dist packages/kafka/consumer/group.py”中,第284行,在init self.\u client=kafkaclient(metrics=self.\u metrics,**self.config)文件“/usr/local/lib/python2.7/dist packages/kafka/client\u async.py”中,第202行,在init self.config['api\u version']=self.check\u version(timeout=check\u timeout)file“/usr/local/lib/python2.7/dist packages/kafka/client\u async.py”的第791行,在check\u version raise errors.nobrokersavable()kafka.errors.nobrokersavable:nobrokersavable中
6条答案
按热度按时间vxf3dgd41#
不能在使用者内创建分区。分区是在创建主题时创建的。例如,使用命令行工具:
这将创建一个新主题“mynewtopic”,其中包含10个分区(编号从0到9)和复制因子3(看到了吗http://docs.confluent.io/3.0.0/kafka/post-deployment.html#admin-运营和https://kafka.apache.org/documentation.html#quickstart_createtopic)
如果你打电话给
assign()
,这意味着您要使用相应的分区,并且该分区必须已经存在。avkwfej42#
看起来您希望开始使用消息而不是创建分区。不过,你能在1234港到达Kafka吗?9092是Kafka的默认端口也许你可以试试这个。如果找到了正确的端口,但应用程序仍会产生错误,则可以尝试使用控制台使用者来测试设置:
bin/kafka-console-producer.sh --broker-list localhost:<yourportnumber> --topic foobar
控制台使用者是标准kafka发行版的一部分。也许这能让你更接近问题的根源。wgx48brx3#
我在Kafka流媒体时也犯了同样的错误。下面的代码解决了我的错误:我们需要在kafkaproducer中定义api版本。
z9zf31ra4#
nobrokersavailable可能是kafka配置中主机名配置错误的答案。
mepcadol5#
对我来说,问题是防火墙规则,因为我在谷歌云上运行Kafka。
它昨天对我有用,今天我绞尽脑汁想了一个小时为什么它不再有用了。
由于我的本地系统的公共ip地址每次连接到不同的lan或wifi时都会发生变化,因此我必须在防火墙规则中允许本地系统的公共ip。我建议使用固定公共ip的连接,或者在切换/更改连接时检查此连接。
配置中的这些小更改需要花费太多的时间来调试和修复它们。我觉得浪费了一个小时。
dly7yett6#
不知道这个答案是否仍然相关,但最近在一个无法从主机windows操作系统访问的vboxvm代理中解决了同样的问题。既然您在kafkaconsumer中提到了bootsrap\u服务器,我假设您使用的至少是kafka 0.10.0.0
请找找
advertised.listeners
属性,并将其设置为PLAINTEXT://localhost:9092
或者PLAINTEXT://<broker_ip>:9092
但在设置之前,请确保可以从用户运行的环境访问代理(通过ping localhost
).另外,您需要重新启动kafka服务器和consumer/producer(无论正在运行什么),然后尝试发送/接收。
例如,如果您正在运行vm,您可能希望使用仅主机适配器使代理可以从主机访问
注意:此配置适用于kafka服务器>=0.10.x.x,但不适用于0.8.2.x。尚未检查0.9.0.x