我需要有一个Kafka生产者和4个消费者在python的平衡队列。
我的主题bash代码:
kafka-topics --bootstrap-server localhost:9092 --create --topic numbers --partitions 4 --replication-factor 1
例如,当我发送生产者消息时,Kafka将消息平均分配给消费者。但是我需要检查消费者是否完成工作,新消息是否分配给消费者。
这有助于我平衡和提高进程速度。
我的消费者代码:
import json, time
from kafka import KafkaConsumer
print("Connecting to consumer ...")
consumer = KafkaConsumer(
'numbers',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
for message in consumer:
print(f"{message.value}")
time.sleep(1)
我的生产者代码:
from time import sleep
from json import dumps
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x: dumps(x).encode('utf-8'))
for e in range(100):
data = {'number' : e}
producer.send('numbers', value=data)
print(f"Sending data : {data}")
sleep(5)
1条答案
按热度按时间cuxqih211#
并发消费者
为了实现这样的功能,您需要创建4个具有相同groupId的消费者。你可以使用Python Threads来实现这一点。
然后,这些消费者中的每一个将被绑定到每个分区,一起轮询消息。
静态密钥/无密钥警告
我还看到你的生产者中的键是一个静态值(null); Kafka实际上会将所有消息发送到同一个分区。如果消息中的键相同,则倾向于这样做。在这种情况下,即使您有4个消费者绑定到4个分区,也只有一个消费者会逐个处理消息,因为所有消息都将在同一个分区中结束。