none使用asyncio阻塞'while true'

yfwxisqw  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(547)

使用下面的代码,我尝试使用asyncio启动两个无限循环:

async def do_job_1():
    while True :
        print('do_job_1')
        await asyncio.sleep(5)

async def do_job_2():
    while True :
        print('do_job_2')
        await asyncio.sleep(5)

if __name__ == '__main__':
    asyncio.run(do_job_1())
    asyncio.run(do_job_2())
``` `do_job_1` 阻碍 `do_job_2` ,作为 `do_job_2` 从不打印do\u job\u 1。我犯了什么错误?
最终我试图转换Kafka消费代码:

from confluent_kafka import Consumer, KafkaError

settings = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'mygroup',
'client.id': 'client-1',
'enable.auto.commit': True,
'session.timeout.ms': 6000,
'default.topic.config': {'auto.offset.reset': 'smallest'}
}

c = Consumer(settings)

c.subscribe(['mytopic'])

try:
while True:
msg = c.poll(0.1)
if msg is None:
continue
elif not msg.error():
print('Received message: {0}'.format(msg.value()))
elif msg.error().code() == KafkaError._PARTITION_EOF:
print('End of partition reached {0}/{1}'
.format(msg.topic(), msg.partition()))
else:
print('Error occured: {0}'.format(msg.error().str()))

except KeyboardInterrupt:
pass

finally:
c.close()

取自https://www.confluent.io/blog/introduction-to-apache-kafka-for-python-programmers 所以我可以并行处理Kafka消息。
okxuctiv

okxuctiv1#

help(asyncio.run) :
它应该作为asyncio程序的主要入口点,理想情况下应该只调用一次。
但你可以用 asyncio.gather 加入任务:

import asyncio

async def do_job_1():
    while True :
        print('do_job_1')
        await asyncio.sleep(5)

async def do_job_2():
    while True :
        print('do_job_2')
        await asyncio.sleep(5)

async def main():
    await asyncio.gather(do_job_1(), do_job_2())

if __name__ == '__main__':
    asyncio.run(main())

相关问题