在asyncio循环中未调用异步函数

zu0ti5jz  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(394)

我的初始代码使用 aiokafka 制片人客户。

import asyncio
import aiokafka

async def foo(loops):
    producer = aiokafka.AIOKafkaProducer(
        loop=loop, bootstrap_servers='localhost:9092'
    )
    await producer.start()

    try:
        for _ in range(loops):
            await producer.send("my_topic", b"Hello world!")
    finally:
        await producer.stop()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(foo(1000))

然而,我认为 producer.send() 如果我们避免 await 在进入its的下一个迭代之前 for 循环。不幸的是,下面修改的代码给出了错误
runtimeerror:此事件循环已在运行。
我想第二个 loop.run_until_complete() 是造成这种情况的原因,但我不知道如何重写代码而不调用第二次。达到预期效果的正确方法是什么?

import asyncio
import aiokafka

async def foo(loops):
    producer = aiokafka.AIOKafkaProducer(
        loop=loop, bootstrap_servers='localhost:9092'
    )
    await producer.start()

    try:
        tasks = []
        for _ in range(N):
            tasks.append(producer.send("my_topic", b"Hello"))
        loop.run_until_complete(asyncio.gather(*tasks))
    finally:
        await producer.stop()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(foo(1000))

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题