NextJS WebSocket模式问题(如何从Websocket中提取流?)

lnvxswe2  于 2023-03-18  发布在  其他
关注(0)|答案(1)|浏览(305)

我正在用NextJS制作一个加密交易应用程序。它通过WebSocket从Bybit交换API获取所有硬币的价格流(WebSocket是必须的,因为你不能通过常规请求获得实时更新,而且它们有速率限制)。
所以,问题是:在nextJS中,您只能在useEffect钩子内使用WebSocket。并且此钩子必须返回WebSocket关闭方法,以便在刷新/导航到另一个页面等时关闭WebSocket,正确维护它,而不是在刷新时创建新的相同WebSocket等。
所以,这个WebSocket订阅了所有的硬币和它们的价格,并且持续地流传输,比如一秒钟很多次。所以,我不能使用整个流,因为这会导致不断的重渲染。所以我在流中实现了过滤逻辑,只更新应用中当前selectedCoin的状态。并且只在这个硬币的价格改变的时候更新状态。所以现在它工作得很好,组件只在价格改变的时候重新呈现。太好了。
但是...它只适用于单个硬币。而且应用程序需要同时支持多个硬币...所以...举个例子,当我在应用程序内换硬币时,它应该被传递到WebSocket流,开始过滤它的价格。但是,WebSocket已经在那里了,我不能给它传递一个新的参数。因此,唯一的方法是关闭当前的WebSocket,并打开一个新的硬币或几个,如果有不同的硬币打开位置订单等,但..关闭和打开一个新的websocket,正如我所看到的,不是最好的方法,因为有时候它会因为某种原因而无法关闭或者无法打开......它不稳定而且“有缺陷......”所以最好的方法是打开一个单独的WebSocket,让它永远运行下去,以某种方式从中提取流,只有在此之后,过滤所需的硬币,并有能力改变参数进行过滤。
简而言之,我需要应用程序同时支持许多硬币。为此,我需要能够过滤实时WebSocket流,而不需要在我选择一个新硬币或多个硬币在应用程序内活动时进行新的websocket连接...我希望我解释得很好...
所以我想知道如何实现它... Redux会有帮助吗?乍一看它不会。nextJS 13 app folder会有帮助吗?idk似乎没有。唯一可能的方法似乎是,它把这个WebSocket放在服务器端,然后制作另一个websocket来连接服务器和客户端,并将流传递到客户端..但同样,它甚至可能idk,我可以通过另一个WebSocket?idk来传输第一个硬币价格流吗?或者可能有一个更简单的实现?
下面是代码:)下面是WebSocket的useEffect

useEffect(() => {
        const getPrices = async (ws: WebSocket, setter, selectedCoin: string, coinsHistory: string[]) => {
            await trader.getPrices(ws, setter, selectedCoin, coinsHistory);
        };

        const pricesWS = new WebSocket(trader.pricesUrl);
        getPrices(pricesWS, setPrice, selectedCoin, coinsHistory);

        return () => {
            if (pricesWS.readyState === 1) pricesWS.close();
        };
    }, [coinsHistory]);

但正如我提到的,它并不理想,因为它经常无法关闭或打开。此外,一个新的WebSocket将只更新价格,如果将有一个更新的硬币,所以有可能,并将有一个很大的滞后,许多硬币在获得价格,这是不可接受的交易应用程序
这是过滤掉所需硬币的方法,但正如我提到的,它只适用于单个预先传递的硬币,所以阅读代码没有多大意义。来自WebSocket的流应该以某种方式不断提取到另一个地方,并且应该有类似的逻辑来过滤掉活动硬币

getPrices = async (ws: WebSocket, setter: any, selectedCoin: string, coinsHistory: string[]): Promise<any> => {
    const res = await axios.get(`https://api-testnet.bybit.com/v5/market/instruments-info?category=linear`);

    ws.onopen = () => {
        console.log('prices webSocket opened');

        const pingInterval = setInterval(() => {
            ws.send(JSON.stringify({ op: 'ping' }));
        }, 30000);

        ws.send(JSON.stringify({ op: 'ping' }));

        ws.onerror = () => {
            clearInterval(pingInterval);
            console.log('ERROR!!!!');
        };
        ws.onclose = () => {
            clearInterval(pingInterval);
            console.log('prices WS closed!');
        };

        const coins = res.data.result.list;

        for (let coin of coins) {
            ws.send(
                JSON.stringify({
                    op: 'subscribe',
                    args: [`tickers.${coin.symbol}`],
                })
            );
        }

        let symbols = [];

        ws.onmessage = e => {
            let res = JSON.parse(e.data.toString('utf8'));

            if (res.data?.symbol && /USDT/i.test(res.data?.symbol)) {
                let stream = res.data;

                if (stream.lastPrice) {
                    //take streams only with the last price
                    if (!symbols.some(arr => arr.value === stream.symbol)) {
                        symbols.push({ value: stream.symbol, label: stream.symbol, price: stream.lastPrice });
                    }
                }
                let newData = symbols.slice(0);
                //update price
                newData.forEach(coin => {
                    //only for coinHistory coins
                    if (coinsHistory.includes(coin.value)) {
                        if (stream.symbol === coin.value && stream.lastPrice && coin.price !== stream.lastPrice) {
                            let tempArray = [];
                            coin.price = stream.lastPrice;
                            console.log('price update');
                            tempArray.push({ price: stream.lastPrice, label: stream.symbol });
                            setter(tempArray);
                        }
                    }
                });
            }
        };
    };
};

所以我希望我的问题已经很清楚了。先谢谢你。

cetgtptt

cetgtptt1#

我得到了两个建议,对所有需要访问WebSocket流的页面使用实验性的appDir客户端组件,对套接字连接使用socketio。
在您的案例中使用新的客户端组件的主要原因是,这些客户端组件就像它们自己的孤岛,这意味着当内容值发生变化时,它们只会重新呈现自己,而不是整个应用。

相关问题