如何从消费者Django通道外部向WebSocket发送数据

vuktfyat  于 2023-04-22  发布在  Go
关注(0)|答案(2)|浏览(126)

我正在尝试从消费者外部向WebSocket发送数据
所以我做了以下事情:

  • 设置.py*
CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": [("127.0.0.1", 6379)],
        },
    },
}
  • routing.py*
from django.urls import path

from .consumers import CoinsListConsumer

websocket_urlpatterns = [
    path('ws/coins/', CoinsListConsumer.as_asgi())
]
  • asgi.py*
import os

from django.core.asgi import get_asgi_application
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.security.websocket import AllowedHostsOriginValidator

from apps.coins.routing import websocket_urlpatterns

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.local')

application = ProtocolTypeRouter({
    'http': get_asgi_application(),
    'websocket': AllowedHostsOriginValidator(
        AuthMiddlewareStack(
            URLRouter(
                websocket_urlpatterns,
            )
        )
    )
})
  • 消费者.py*
class CoinsListConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        logger.info('Websocket was connected.')
        await self.accept()
    
    async def disconnect(self, code):
        logger.warning('Websocket was disconnected.')
        pass

    async def receive(self, text_data=None, bytes_data=None):
        return await super().receive(text_data, bytes_data)

好吧,这是确定的,当我去一个视图…WebSocket将连接得很好
但是当我想发送数据到WebSocket时。

def send_data_to_websocket_coins_list_view(data: List[Dict]) -> None:
    """Send data to websocket coins list view """
    async_to_sync(channel_layer.send)(json.dumps(data))

这不起作用,并引发以下错误TypeError: send() missing 1 required positional argument: 'message'
同样在文档中,这应该通过以下代码来实现

async_to_sync(channel_layer.send)("channel_name", json.dumps({...}))

它也不工作,并引发以下错误AssertionError: message is not a dict
问题是什么?我该怎么办?

zte4gxcn

zte4gxcn1#

您不需要转储数据

async_to_sync(channel_layer.send)(data)
trnvg8h3

trnvg8h32#

您可以在ApiView中使用它,例如:

from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync

class NotificationView(APIView):
    def post(self, request, *args, **kwargs):
        # Process the notification
        notification_data = {'message': 'New notification'}

        # Send the notification to the WebSocket consumer
        channel_layer = get_channel_layer()
        async_to_sync(channel_layer.group_send)(
            'notifications',
            {'type': 'send_notification', 'text': json.dumps(notification_data)}
        )

        return Response(status=status.HTTP_200_OK)

相关问题