我试图让我的WebSocket代码自动尝试重新连接(无限期),直到成功。通过每隔x秒发送一次“ping”消息,我可以检测到管道何时损坏,并调用closeObserver
。
但是,我不确定如何启动重新连接序列。
const notificationConnectionEpic: Epic<ActionTypes, any, RootState> = (
action$,
state$
) =>
action$.pipe(
filter(isActionOf(actions.connectNotificationPipeline.request)),
switchMap(async action => {
const resp = await requireValidToken(action$, state$, params =>
AdminHubs.getHubNotificationsToken({
...params,
id: action.payload.hubId
})
);
return resp.pipe(
switchMap(v => {
if (isAction(v)) {
return of(v);
}
if (!v.ok) {
return of(
actions.connectNotificationPipeline.failure({
hubId: action.payload.hubId,
error: v.error
})
);
}
const webSocketOpen$ = new Subject();
const webSocketClose$ = new Subject();
const webSocket$ = webSocket<AdminHubs.HubNotification>({
url: v.value,
openObserver: webSocketOpen$,
closeObserver: webSocketClose$
});
const message$ = webSocket$.pipe(
map(message => actions.receiveNotification({ message })),
takeUntil(action$.ofType(HubActionConsts.NOTIFICATION_PIPE_CLOSED))
);
const ping$ = interval(1000).pipe(
map(_ => webSocket$.next("ping" as any)),
ignoreElements()
);
const open$ = webSocketOpen$.pipe(
take(1),
map(_ =>
actions.connectNotificationPipeline.success({
hubId: action.payload.hubId
})
)
);
const close$ = webSocketClose$.pipe(
// called when a network drop happens. handle reconnect?
); // also happens on net error
return merge(message$, open$, ping$, close$);
})
);
}),
mergeMap(v => v)
);
2条答案
按热度按时间oxcyiej71#
当WebSocket连接关闭时,只需再次调度
actions.connectNotificationPipeline.request
。这将重新运行此代码并创建一个新的WebSocket连接。gzszwxb42#
在过去的几周里,我一直在努力解决这个问题,并决定创建a package called SuperSocket--以防它对任何人都有帮助!它应该作为您正在使用的WebSocket观察者实现的替代品。
SuperSocket位于现有的WebSocket实现之上,并且在其他功能中,确保它重新连接,直到它成功。当然,你可以设置一个最大重试次数来避免无限循环和不必要的CPU负载:)