我在Kafka里有三个消费者和一个生产者。当生产者发送所有消息时(在我的简单代码中有100条消息),这些消息被划分给三个消费者,我的主要问题就是这种消息的划分。有时一条消息可能很长,这就是为什么一个消费者可能无法快速回答所有消息,但另一个快速回答所有消息的消费者却变得无所事事。
如何让队列中的所有消息,以及消费者何时完成工作,然后从生产者接收下一条消息?(当然,我不知道消费者是否收到来自生产者或主题的消息,我是这个领域的初学者)谢谢你完全指导我。
我拍了一个关于工作过程的视频,请观看。视频显示,一名消费者已经完成工作,处于空闲状态,但另外两名消费者正在跑步。
Movie link。
我的代码:主题:
kafka-topics --bootstrap-server localhost:9092 --create --topic numbers --partitions 4 --replication-factor 1
producer.py:
from time import sleep
from json import dumps
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x: dumps(x).encode('utf-8'))
for e in range(100):
data = {'number' : e}
producer.send('numbers', value=data)
print(f"Sending data : {data}")
consumer0.py:
import json
from kafka import KafkaConsumer
print("Connecting to consumer ...")
consumer = KafkaConsumer(
'numbers',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
for message in consumer:
print(f"{message.value}")
consumer1.py:
import json
from kafka import KafkaConsumer
import time
print("Connecting to consumer ...")
consumer = KafkaConsumer(
'numbers',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
for message in consumer:
time.sleep(1)
print(f"{message.value}")
consumer2.py:
import json
from kafka import KafkaConsumer
import time
print("Connecting to consumer ...")
consumer = KafkaConsumer(
'numbers',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
for message in consumer:
time.sleep(2)
print(f"{message.value}")
1条答案
按热度按时间0pizxfdo1#
这些消息在三个消费者之间分配
这可不能保证Kafka对记录进行批处理,根据批处理的大小,所有记录可能最终都在一个分区中。
您可以发送带有键的数据来强制执行不同的分区,假设每个键都将被散列并以分区数为模得到一个唯一的值,但是生产者不能强制将消息广播到同一组的所有消费者,而不将相同的事件复制到每个分区
很快就变得无所事事。
然后产生更多的数据。如果您希望一批发送100条记录,并且永远不发送新事件,那么您可能并不真正需要流处理
每当消费者完成了他们的工作,就从生产者那里接收下一条消息
这正是你的for循环所做的
我不知道消费者是否从生产者或主题接收消息
您可以使用使用者滞后度量监视工具来检测这一点