我必须异步处理来自redis的每一条消息,下面是我对aioredis的尝试:
import asyncio
import aioredis
async def reader(channel: aioredis.client.PubSub):
while True:
data = None
try:
message = await channel.get_message(ignore_subscribe_messages=True)
if message is not None:
print(f"(Reader) Message Received: {message}")
data = message["data"]
except asyncio.TimeoutError:
pass
if data is not None:
await process_message(data)
async def process_message(message):
print(f"start process {message=}")
await asyncio.sleep(10)
print(f"+processed {message=}")
async def publish(redis, channel, message):
print(f"-->publish {message=} to {channel=}")
result = await redis.publish(channel, message)
print(" +published")
return result
async def main():
redis = aioredis.from_url("redis://localhost")
pubsub = redis.pubsub()
await pubsub.subscribe("channel:1", "channel:2")
future = asyncio.create_task(reader(pubsub))
await publish(redis, "channel:1", "Hello")
await publish(redis, "channel:2", "World")
await future
if __name__ == "__main__":
asyncio.run(main())
问题是,如果前一条消息没有处理,那么aioredis就不会执行get_message
,而是一条一条地处理。
如何解决这个问题?
1条答案
按热度按时间fkvaft9z1#
我找到解决办法了。
应使用
asyncio.ensure_future(process_message(data))
代替await process_message(data)
这个想法来自AIORedis and PUB/SUB aren't asnyc