我是Kafka的新手,正在尝试建立一个服务对服务的消息传递平台。以下是我的设置:
Kafka0.9.0.1
Zookeeper3.4.8
Kafkapython 1.3.3
我的应用程序创建 KafkaProducer
我从中向一个具有6个分区的主题发送消息流。我还创建了7 KafkaConsumer
s(在单个 group_id
,其中6个被分配给6个分区,一个处于空闲状态(这是预期的)。当producer正在流化时,我将分区计数增加到7,并期望流不会分布在7个分区上,从而唤醒空闲的使用者。但是,在我通过重新启动应用程序对新添加的分区进行重新初始化之前,生产者似乎不会选择它。我通过运行以下命令来扩展分区计数: kafka-topics --alter --zookeeper localhost:2181 --topic test --partitions 7
有没有一种方法可以让生产者在不重新初始化分区计数的情况下获取分区计数的变化?
以下是相关的代码片段:
制作人
class Producer(threading.Thread):
daemon = True
def __init__(self, name, manager):
super(Producer, self).__init__()
self.producer = KafkaProducer(bootstrap_servers='localhost:9092')
def run(self):
while not self.killed:
if not self.q.empty():
self._busy()
self.producer.send('test', value=self.q.get())
else:
self._free()
消费者
class Consumer(threading.Thread):
daemon = True
def __init__(self, name, manager):
super(Consumer, self).__init__()
self.consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
group_id='test_group',
client_id="Consumer " + self.name)
self.consumer.subscribe(['test'])
def run(self):
while not self.killed:
messages = self.consumer.poll()
for topic, records in messages.iteritems():
print self.consumer.config['client_id'] + ": " + str(records)
1条答案
按热度按时间lyfkaqu11#
我遇到了一个可能类似的问题,并找到了解决办法。我在这里写过:librdkafka制作人如何了解kafka中的新主题划分
如果您的测试太短,这可能就是生产者没有了解新分区的原因。默认情况下,参数topic.metadata.refresh.interval.ms为300000(以毫秒为单位),因此代理将每5分钟刷新生产者中的元数据。如果您的测试在添加分区后花费了5分钟以上,那么这不是原因。