Python WebSocket客户端以不同的时间间隔定期发送消息

5lhxktic  于 2023-01-01  发布在  Python
关注(0)|答案(1)|浏览(410)

我想发送两个不同的消息到一个websocket服务器,但是使用不同的时间间隔。例如:
1.第一条消息应每2秒发送一次。
1.第二条消息应每5秒发送一次。

async def send_first_message(websocket):
    while True: 
        await websocket.send("FIRST MESSAGE")
        response = await websocket.recv()
        await asyncio.sleep(2)


async def send_second_message():
    while True: 
        async with websockets.connect(f"ws://{IP}:{PORT}") as websocket:

            asyncio.create_task(send_first_message(websocket))

            while True:
                await websocket.send("SECOND MESSAGE")
                response = await websocket.recv()
                await asyncio.sleep(5)

asyncio.run(send_second_message())

如果我像这样运行代码,我会得到:
"运行时错误:当另一个协同程序正在等待下一个消息时,无法调用recv "
如果我注解掉其中一个"await websocket.recv()",它会正常工作几秒钟,然后抛出:
"RuntimeError未接收或发送关闭帧"

wqlqzqxt

wqlqzqxt1#

在您尝试在任务中执行的操作(同步请求-响应交互)与协议和库期望您执行的操作(异步消息)之间存在一些脱节。
在编写异步代码时,您需要查看库/协议/服务期望什么是原子操作,可以与其他所有操作异步发生,以及您希望什么是一系列同步操作,然后您需要在库中找到将支持它的原语。在Websockets的情况下,原子操作是一个消息在两个方向上发送,所以你不能期望websockets同步两个消息流。
或者换句话说,你希望每个发送消息都有同步响应,但是websocket并不是设计来处理交叉的同步请求的,你已经向websocket服务器发送了一条消息,你想得到对该消息的响应。但是你还在同一个websocket上发送了另一个消息,你也想得到对它的响应。你的客户端websocket库不能'不区分用于第一请求的消息和用于第二请求的消息(因为从网络套接字协议层来看,这是一个无意义的概念,所以库通过将网络套接字上可以阻塞的recv操作限制为一个来实施这一点)。
所以...

  • 选项1-在单独的插槽上执行多项任务 *

从库限制一个websocket到一个阻塞recv的事实来看,协议中满足要求的原语是websocket本身。如果这些是单独的请求,你需要单独的阻塞响应(所以只有在这些响应可用时才继续请求任务),那么你可以有单独的websocket连接,并阻塞每个响应。

    • 客户端1.py**
async def send_first_message():
    async with websockets.connect(f"ws://{IP}:{PORT}") as websocket:
        while True: 
            await websocket.send("FIRST MESSAGE")
            response = await websocket.recv()
            print(response)
            await asyncio.sleep(2)

async def send_second_message():
    async with websockets.connect(f"ws://{IP}:{PORT}") as websocket:
        while True: 
            await websocket.send("SECOND MESSAGE")
            response = await websocket.recv()
            print(response)
            await asyncio.sleep(5)

async def main():
    asyncio.create_task(send_first_message())
    asyncio.create_task(send_second_message())
    await asyncio.Future()

asyncio.run(main())

然而,选项1并不是真正的websocket或异步方式。

  • 选项2-采用异步 *

要在单个websocket上执行此操作,您需要异步接收两个发送任务的响应。
如果您实际上并不关心send_ * 函数是否得到响应,那么您可以轻松地做到这一点...

    • 客户端2.py**
async def send_first_message(websocket):
    while True: 
        await websocket.send("FIRST MESSAGE")
        await asyncio.sleep(2)

async def send_second_message(websocket):
    while True:
        await websocket.send("SECOND MESSAGE")
        await asyncio.sleep(5)
        
async def receive_message(websocket):
    while True:
        response = await websocket.recv()
        print(response)

async def main():
    async with websockets.connect(f"ws://{IP}:{PORT}") as websocket:
        asyncio.create_task(send_first_message(websocket))
        asyncio.create_task(send_second_message(websocket))
        asyncio.create_task(receive_message(websocket))
        await asyncio.Future()

asyncio.run(main())
  • 备选方案3 *

但是如果你想把对请求的响应排成一行,并保持在一个单独的websocket上呢?你需要某种方式来知道任何特定的响应是针对哪个请求的。大多数需要这种交互的Web服务会让你在消息中发送一个ID给服务器,一旦响应准备好,它就会用这个ID作为引用来响应。
还有一种方法可以让消息任务阻塞并等待具有正确ID的响应,方法是将响应排队并定期检查它们。

    • 客户端3.py**
unhandled_responses = {}

async def send_first_message(websocket):
    while True:
        req_id = random.randint(0,65535)
        message = json.dumps({'id': req_id, 'message': 'FIRST MESSAGE'})
        await websocket.send(message)
        response = await block_for_response(req_id)
        print(response)
        await asyncio.sleep(2)

async def send_second_message(websocket):
    while True:
        req_id = random.randint(0,65535)
        message = json.dumps({'id': req_id, 'message': 'SECOND MESSAGE'})
        await websocket.send(message)
        response = await block_for_response(req_id)
        print(response)
        await asyncio.sleep(5)

async def block_for_response(id):
    while True:
        response = unhandled_responses.pop(id, None)
        if response:
            return response
        await asyncio.sleep(0.1)

async def receive_message(websocket):
    while True:
        response = json.loads(await websocket.recv())
        unhandled_responses[response['id']] = response

async def main():
    async with websockets.connect(f"ws://{IP}:{PORT}") as websocket:
        asyncio.create_task(send_first_message(websocket))
        asyncio.create_task(send_second_message(websocket))
        asyncio.create_task(receive_message(websocket))
        await asyncio.Future()

asyncio.run(main())

为了完整起见,在我的测试中客户端与之对话的服务器代码。

    • 服务器. py**
import asyncio
import websockets

async def server_endpoint(websocket):
    try:
        while True:
            recv_msg = await websocket.recv()
            response = recv_msg
            await websocket.send(response)
    except Exception as ex:
        print(str(ex))

async def main():
    async with websockets.serve(server_endpoint, "localhost", 8765):
        await asyncio.Future()  # run forever

if __name__ == "__main__":
    asyncio.run(main())

相关问题