python 如何异步处理redis消息?

ffscu2ro  于 2023-01-24  发布在  Python
关注(0)|答案(1)|浏览(196)

我必须异步处理来自redis的每一条消息,下面是我对aioredis的尝试:

import asyncio
import aioredis

async def reader(channel: aioredis.client.PubSub):
    while True:
        data = None
        try:
            message = await channel.get_message(ignore_subscribe_messages=True)
            if message is not None:
                print(f"(Reader) Message Received: {message}")
                data = message["data"]
        except asyncio.TimeoutError:
            pass

        if data is not None:
            await process_message(data)

async def process_message(message):
    print(f"start process {message=}")
    await asyncio.sleep(10)
    print(f"+processed {message=}")

async def publish(redis, channel, message):
    print(f"-->publish {message=} to {channel=}")
    result = await redis.publish(channel, message)
    print("     +published")
    return result

async def main():
    redis = aioredis.from_url("redis://localhost")
    pubsub = redis.pubsub()
    await pubsub.subscribe("channel:1", "channel:2")

    future = asyncio.create_task(reader(pubsub))

    await publish(redis, "channel:1", "Hello")
    await publish(redis, "channel:2", "World")

    await future

if __name__ == "__main__":
    asyncio.run(main())

问题是,如果前一条消息没有处理,那么aioredis就不会执行get_message,而是一条一条地处理。
如何解决这个问题?

fkvaft9z

fkvaft9z1#

我找到解决办法了。
应使用asyncio.ensure_future(process_message(data))代替await process_message(data)
这个想法来自AIORedis and PUB/SUB aren't asnyc

相关问题