如何给每一个在Kafka闲下来的消费者传递信息?

qxgroojn  于 2023-10-15  发布在  Apache
关注(0)|答案(1)|浏览(109)

我在Kafka里有三个消费者和一个生产者。当生产者发送所有消息时(在我的简单代码中有100条消息),这些消息被划分给三个消费者,我的主要问题就是这种消息的划分。有时一条消息可能很长,这就是为什么一个消费者可能无法快速回答所有消息,但另一个快速回答所有消息的消费者却变得无所事事。
如何让队列中的所有消息,以及消费者何时完成工作,然后从生产者接收下一条消息?(当然,我不知道消费者是否收到来自生产者或主题的消息,我是这个领域的初学者)谢谢你完全指导我。
我拍了一个关于工作过程的视频,请观看。视频显示,一名消费者已经完成工作,处于空闲状态,但另外两名消费者正在跑步。
Movie link
我的代码:主题:

kafka-topics --bootstrap-server localhost:9092 --create  --topic numbers  --partitions 4 --replication-factor 1

producer.py:

from time import sleep
from json import dumps
from kafka import KafkaProducer
  
producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x: dumps(x).encode('utf-8'))
  
for e in range(100):
   data = {'number' : e}
   producer.send('numbers', value=data)
   print(f"Sending data : {data}")

consumer0.py:

import json 
from kafka import KafkaConsumer

print("Connecting to consumer ...")
consumer = KafkaConsumer(
    'numbers',
     bootstrap_servers=['localhost:9092'],
     auto_offset_reset='earliest',
     enable_auto_commit=True,
     group_id='my-group',
     value_deserializer=lambda x: json.loads(x.decode('utf-8')))

for message in consumer:
 print(f"{message.value}")

consumer1.py:

import json 
from kafka import KafkaConsumer
import time

print("Connecting to consumer ...")
consumer = KafkaConsumer(
    'numbers',
     bootstrap_servers=['localhost:9092'],
     auto_offset_reset='earliest',
     enable_auto_commit=True,
     group_id='my-group',
     value_deserializer=lambda x: json.loads(x.decode('utf-8')))

for message in consumer:
 time.sleep(1)
 print(f"{message.value}")

consumer2.py:

import json 
from kafka import KafkaConsumer
import time

print("Connecting to consumer ...")
consumer = KafkaConsumer(
    'numbers',
     bootstrap_servers=['localhost:9092'],
     auto_offset_reset='earliest',
     enable_auto_commit=True,
     group_id='my-group',
     value_deserializer=lambda x: json.loads(x.decode('utf-8')))

for message in consumer:
 time.sleep(2)
 print(f"{message.value}")
0pizxfdo

0pizxfdo1#

这些消息在三个消费者之间分配
这可不能保证Kafka对记录进行批处理,根据批处理的大小,所有记录可能最终都在一个分区中。
您可以发送带有键的数据来强制执行不同的分区,假设每个键都将被散列并以分区数为模得到一个唯一的值,但是生产者不能强制将消息广播到同一组的所有消费者,而不将相同的事件复制到每个分区
很快就变得无所事事。
然后产生更多的数据。如果您希望一批发送100条记录,并且永远不发送新事件,那么您可能并不真正需要流处理
每当消费者完成了他们的工作,就从生产者那里接收下一条消息
这正是你的for循环所做的
我不知道消费者是否从生产者或主题接收消息
您可以使用使用者滞后度量监视工具来检测这一点

相关问题