我是新来Kafka,所以如果我错过了什么,请让我知道?用例如下所示,有三个kafka使用者正在运行,从中读取的消息经过处理并通过post调用发送到客户端api,但是客户端所花费的时间(对于测试,我使用了8分钟)不是固定的,因此它可以大于max.poll.timeout(对于本例,我使用了6分钟)。为此,我尝试在客户机发回响应时暂停()消费者,然后使用poll()调用resume()从kafka代理获取下一条消息。我尝试用下面的代码在异步线程中进行处理,但是这不是异步执行的,重新平衡正在发生,然后所有的使用者都离开了组。我如何处理这个用例,为什么代码不能异步执行?
版本:
python 2.7版
Kafkapython 1.4.6
巨魔2.2.1
import trollius as asyncio
from trollius import From
for message in consumer:
consumer.pause(*consumer.assignment())
loop = asyncio.get_event_loop()
tasks = [
asyncio.ensure_future(thread_message_processor(consumer)),
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
.....
@asyncio.coroutine
def thread_message_processor(consumer):
# for testing I am using sleep as time taken by client API for processing
yield From(asyncio.sleep(480))
consumer.resume(*consumer.assignment())
暂无答案!
目前还没有任何答案,快来回答吧!