在python中测试WebSocket可用性的简单方法

gzszwxb4  于 2023-03-08  发布在  Python
关注(0)|答案(3)|浏览(161)

我使用下面的代码来测试本地WebSocket服务器是否正在运行:

import asyncio
import websockets

async def hello():
    async with websockets.connect('ws://127.0.0.1:8000/ws/registration/123') as websocket:
        await websocket.send(json_data)

asyncio.get_event_loop().run_until_complete(hello())

有没有不使用asyncio的更简单的方法来实现这一点?例如:

import asyncio
import websockets

conn = websockets.connect('ws://127.0.0.1:8000/ws/registration/123')
conn.send('hello')

基本上,我只是试图找到最简单的方法来测试我的WebSocket服务器是否正在监听和接收特定url的消息。

vnjpjtjt

vnjpjtjt1#

async_to_sync不是让这个过程变得更复杂了吗?为什么不创建一个普通的test_url函数呢?

def test_url(url, data=""):
    async def inner():
        async with websockets.connect(url) as websocket:
            await websocket.send(data)
    return asyncio.get_event_loop().run_until_complete(inner())

test_url("ws://127.0.0.1:8000/ws/registration/123")
mitkmikd

mitkmikd2#

可以使用async_to_sync执行上述操作,例如:

from asgiref.sync import async_to_sync
import websockets

def test_url(url, data=""):
    conn = async_to_sync(websockets.connect)(url)
    async_to_sync(conn.send)(data)

test_url("ws://127.0.0.1:8000/ws/registration/123")

请注意,“握手”可能不会在这里完成,因为它需要被接受的两种方式,但上述应使您能够测试,以确保网址被正确路由,等等。

efzxgjgh

efzxgjgh3#

如果你需要禁用证书验证,你可以这样做:

#!/usr/bin/env python3 

import asyncio
import ssl
import websockets 

def test_url(url, data=""):
    async def inner():
        ctx = ssl.create_default_context()
        ctx.check_hostname = False
        ctx.verify_mode = ssl.CERT_NONE
        async with websockets.connect(url, ssl=ctx) as websocket:
            await websocket.send(data)
    return asyncio.get_event_loop().run_until_complete(inner())

test_url("ws://127.0.0.1:8000/ws/registration/123")

相关问题