SpringBootWebSockets通过rxjs角型websockets发送数据

0yycz8jy  于 2021-07-13  发布在  Java
关注(0)|答案(0)|浏览(274)

我是websocket的新手,在编写SpringBoot后端和angular前端之间的实时websocket通信方面有点吃力。用例非常简单,它是关于项目实时竞价的。所以我们的想法是有一些“房间”,我们想订阅新的信息。
我当前的websocket服务:

@Injectable({
  providedIn: 'root'
})
export class WebSocketService {

  connections: Array<ConnectionForHighestBid>;
  RETRY_SECONDS = 10;

  constructor() {
  }

  connect(itemId: number): Observable<any> {
    return of('http://localhost:8090').pipe(
      map(apiUrl => apiUrl.replace(/^http/, 'ws') + '/ws-item/' + itemId + '/highest-bid'),
      switchMap(wsUrl => {
        const connection = this.connections.filter(c => c.itemId === itemId);

        // In case we already connected for given item id
        if (connection && connection.length === 1) {
          return connection[0].connection$;
        } else {
          const newConnection: ConnectionForHighestBid = {
            itemId,
            connection$: webSocket(wsUrl)
          };
          this.connections.push(newConnection);

          return newConnection.connection$;
        }
      }),
      retryWhen((errors) => errors.pipe(delay(this.RETRY_SECONDS)))
    );
  }
}

export interface ConnectionForHighestBid {
  itemId: number;
  connection$: WebSocketSubject<any>;
}

以及我正在执行以下操作的一个项的内部组件(this.itemid是我要订阅websocket通信的项的id:

this.webSockedService.connect(this.itemId).pipe(
  takeUntil(this.destroyed$)
).subscribe(data => {
  // Handle the newest data here (update price, add new bid item info, ...)
  console.log(data);
});

后端正在使用spring boot websockets,目前有以下方法:

@MessageMapping("/ws-item/update/{itemId}")
@SendTo("/ws-item/{itemId}/highest-bid")
public ResponseEntity<ItemWsInfoResponse> getHighestBidDetail(@DestinationVariable("itemId") long itemId) {

    return new ResponseEntity<>(itemService.findHighestBidByItemId(itemId), HttpStatus.OK);
}

@MessageMapping("/ws-item/bid/{itemId}")
@SendTo("/ws-item/{itemId}/highest-bid")
public ResponseEntity<ItemWsInfoResponse> bid(@DestinationVariable("itemId") long itemId,
                                                              @RequestBody @NonNull @Valid ItemWsBidRequest request) {

    LocalDateTime currentTime =
            LocalDateTime.ofInstant(Instant.ofEpochMilli(System.currentTimeMillis()), ZoneId.systemDefault());

    Optional<ItemWsInfoResponse> bidOpt = itemService.bid(itemId, request, currentTime);

    return bidOpt
            .map(i -> new ResponseEntity<>(i, HttpStatus.OK))
            .orElseGet(() -> new ResponseEntity<>(HttpStatus.BAD_REQUEST));
}

在这两种情况下,输出都是一个pojo,其中包含有关商品和当前最高出价的信息。其中第一种方法是获取当前信息。我不确定我是否需要那种方法。是否可以在websocket connect上执行此操作?如果我成功连接,我会得到这个pojo。
第二种方法是bidding本身(无需深入研究,方法只需检查并创建bid并返回pojo中的当前状态)。
配置如下

/**
 * Websocket configuration
 *
 * Request URL: /ws-item
 * Subscription URL: /ws-item/{itemId}
 */
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/ws-item");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws-item").setAllowedOrigins("http://localhost:4200").withSockJS();
    }
}

代码似乎还可以(或者,至少我正在“连接”到我的后端websocket层。但我真的不知道如何从前端执行投标行动。我想发送消息是要对给定的项目投标,是会做一些检查,如果成功的话,它会发送消息到“一般”池我订阅现在在那里我可以处理数据(更新项目价格,为新的出价添加新行,检查是否登录的用户当前中标等)。
所以问题是:如何在给定的连接上从fe执行bid操作(发送到bid be方法的数据现在可以是虚拟的和静态的,动态填充json是这里最小的问题)?有没有可能在连接后立即获得一些数据(所以我不需要称之为 /ws-item/update/{itemId} 连接成功后立即终止?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题