将redis订阅消息中继到websocket

56lgkhnf  于 2021-06-08  发布在  Redis
关注(0)|答案(1)|浏览(412)

使用模式订阅redis (*:*:*) 每分钟接收一次数据;同样作为一个websocket服务器运行,它检查特定的订阅消息,如果特定订阅消息的数据由redis返回,则直接将redis返回的数据推送到该ws通道
例如:如果ws-client想要订阅一个名为binance:btc-usdt:1m,而且每分钟都有来自redis的数据,如何在从redis获得新数据后立即将其发送到ws-client?
最有效的方法是什么?

export {};
const redis = require("redis");
const WebSocket = require("ws");

const subscriber = redis.createClient();
subscriber.psubscribe("*:*:*");
const wss = new WebSocket.Server({ port: 8080 });

subscriber.on("pmessage", function (pattern, channel, message) {
  console.log(message);
});

wss.on("connection", function connection(ws) {
  ws.on("message", function incoming(message) {
    console.log("Server Received: %s", message);
  });

  ws.send("something from server");
});
vs91vp4v

vs91vp4v1#

ws.ts公司

import { IWSClient } from "./types/IChannel";

export default class WSHandler {
  sockets: Object = {};

  bindChannelWS(channel: any, client: IWSClient) {
    if (this.sockets[channel] === undefined) {
      this.sockets[channel] = [client];
    } else {
      this.sockets[channel].push(client);
    }
  }

  publishToChannel(channel: string, message: string) {
    if (this.sockets[channel] !== undefined) {
      this.sockets[channel].forEach(function (client: IWSClient) {
        console.log(JSON.parse(message));
        client.send(message);
      });
    }
  }
}

索引.ts

export {};
const redis = require("redis");
const WebSocket = require("ws");
import { IWSClient } from "./types/IChannel";
import WSHandler from "./ws";

const subscriber = redis.createClient();
subscriber.psubscribe("*:*:*");
const wss = new WebSocket.Server({ port: 8080 });

var wsHandler = new WSHandler();

subscriber.on("pmessage", function (
  pattern: any,
  channel: string,
  message: string
) {
  wsHandler.publishToChannel(channel, message);
});

wss.on("connection", function connection(ws: IWSClient) {
  ws.on("message", function incoming(_subMessages: string) {
    let subMessages: Array<string> = JSON.parse(_subMessages);
    console.log("Server Received: ", subMessages);
    subMessages.forEach((channel) => wsHandler.bindChannelWS(channel, ws));
  });

  ws.send(JSON.stringify({ connection: "Initiated" }));
});

相关问题