需要平衡Kafka消费者任务

sh7euo9m  于 2023-10-15  发布在  Apache
关注(0)|答案(1)|浏览(167)

我需要有一个Kafka生产者和4个消费者在python的平衡队列。
我的主题bash代码:

  1. kafka-topics --bootstrap-server localhost:9092 --create --topic numbers --partitions 4 --replication-factor 1

例如,当我发送生产者消息时,Kafka将消息平均分配给消费者。但是我需要检查消费者是否完成工作,新消息是否分配给消费者。
这有助于我平衡和提高进程速度。
我的消费者代码:

  1. import json, time
  2. from kafka import KafkaConsumer
  3. print("Connecting to consumer ...")
  4. consumer = KafkaConsumer(
  5. 'numbers',
  6. bootstrap_servers=['localhost:9092'],
  7. auto_offset_reset='earliest',
  8. enable_auto_commit=True,
  9. group_id='my-group',
  10. value_deserializer=lambda x: json.loads(x.decode('utf-8')))
  11. for message in consumer:
  12. print(f"{message.value}")
  13. time.sleep(1)

我的生产者代码:

  1. from time import sleep
  2. from json import dumps
  3. from kafka import KafkaProducer
  4. producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x: dumps(x).encode('utf-8'))
  5. for e in range(100):
  6. data = {'number' : e}
  7. producer.send('numbers', value=data)
  8. print(f"Sending data : {data}")
  9. sleep(5)
cuxqih21

cuxqih211#

并发消费者

为了实现这样的功能,您需要创建4个具有相同groupId的消费者。你可以使用Python Threads来实现这一点。

  1. import threading
  2. from kafka import KafkaConsumer
  3. def consumer_thread(consumer):
  4. for message in consumer:
  5. print(message)
  6. if __name__ == "__main__":
  7. consumer_group_id = "my-group"
  8. bootstrap_servers = ["localhost:29092"]
  9. topic = "my-topic"
  10. consumers = []
  11. for i in range(4):
  12. consumer = KafkaConsumer(
  13. topic,
  14. group_id=consumer_group_id,
  15. bootstrap_servers=bootstrap_servers,
  16. )
  17. print("starting consumer")
  18. thread = threading.Thread(target=consumer_thread, args=(consumer,))
  19. consumers.append(thread)
  20. thread.start()
  21. for thread in consumers:
  22. thread.join()

然后,这些消费者中的每一个将被绑定到每个分区,一起轮询消息。

静态密钥/无密钥警告

我还看到你的生产者中的键是一个静态值(null); Kafka实际上会将所有消息发送到同一个分区。如果消息中的键相同,则倾向于这样做。在这种情况下,即使您有4个消费者绑定到4个分区,也只有一个消费者会逐个处理消息,因为所有消息都将在同一个分区中结束。

展开查看全部

相关问题