如何使用RxJS WebSocketSubject重试/重新连接到WebSocket服务器

btxsgosb  于 2023-06-23  发布在  其他
关注(0)|答案(1)|浏览(183)

我找到了RxJS WebSocketSubject自动重新连接的各种解决方案,但没有一个对我有效。
在我的RxJS 6.5/Angular Code中,有一个connectToWebsocket()函数,在启动时执行一次,包含以下内容:

const openObserver: NextObserver<Event> = {
        next: () => console.debug('[Websocket] Connected')
    };
    const closeObserver: NextObserver<CloseEvent> = {
        next: () => console.debug('[Websocket] Disconnected')
    };

    // Connect to WebSocket Server
    this.websocketConnection= new WebSocketSubject({
        url: "ws://localhost:4000",
        deserializer: e => e.data,
        openObserver,
        closeObserver
    });

创建主题后,我订阅了它:

this.websocketConnection.subscribe((msg) => {
        this.data = msg;
    },
    (err) => {
        console.log("Error on on connection", err);
    },
    () => {
        console.log("Connection closed");
    });

所以现在,如果最初或在连接时,连接在任何情况下都是关闭的。如何重新连接到websocket服务器?我需要自己写一个重连接策略吗?或者RxJS中已经有了?
先谢谢你了!

zphenhs4

zphenhs41#

这里是用于Web套接字支持的Angular Service。产品特点:

  • 出现错误时,5秒后重新连接
  • 授权
import { Injectable } from "@angular/core";
    import { Subject, timer } from "rxjs";
    import { webSocket, WebSocketSubject } from "rxjs/webSocket";
    import { tap, retryWhen, delayWhen } from "rxjs/operators";
    
    import { ApiConfigService } from "../../api-config.service";
    import { AuthStorageService } from "../../auth/auth-storage.service";
    
    @Injectable({
        providedIn: 'root'
    })
    export class WebsocketService {
      private RECONNECT_INTERVAL: number = 5000;
    
      private socketsBaseUrl: string;
      private socketClientId: string
      private socket$: WebSocketSubject<any>;
      private messages$: Subject<MessageEvent>;
    
      constructor(apiConfigService: ApiConfigService, private authStorageService: AuthStorageService) {
        this.socketsBaseUrl = apiConfigService.socketsBaseUrl;
      }
    
      public connect(jobId: string, cfg: { reconnect: boolean } = { reconnect: false }): Subject<MessageEvent> {
        if (!this.socket$ || this.socket$.closed) {
          this.socket$ = this.createNewWebSocket(jobId);
          this.messages$ = <Subject<MessageEvent>>this.socket$.pipe(
            tap({
                error: error => console.log(error),
            }),
            retryWhen(errors =>
                errors.pipe(
                    tap(val => console.log('[WebSocket] trying to reconnect', val)),
                    delayWhen(_ => timer(this.RECONNECT_INTERVAL))
                )
            )
          );
        }
        return this.messages$;
      }
    
      private createNewWebSocket(jobId: string): WebSocketSubject<any> {
        const clientId: number = Date.now();
        this.socketClientId = `${clientId}-job${jobId}`
        const authHeader: string = encodeURIComponent(this.authStorageService.getAuthorizationHeaderValue());
        const socketsUrl: string = `${this.socketsBaseUrl}/${this.socketClientId}?authorization=${authHeader}`;
        return webSocket({
            url: socketsUrl,
            openObserver: {
                next: () => {
                  console.log('[WebSocket] connection established');
                }
              },
            closeObserver: {
                next: () => {
                  console.log('[WebSocket] connection closed');
                  this.socket$ = undefined;
                  this.connect(jobId, { reconnect: true });
                }
            },
        });
      }
    
      public close(): void {
          if (this.socket$ !== undefined) {
            this.socket$.complete();
            this.socket$ = undefined;
            console.log(`[WebSocket] connection closed`);
          }
      }
    }

this.connect(jobId, { reconnect: true });-重新连接:true是您正在寻找的选项

相关问题