我计划在我的通知系统中使用rsocket。我想在我的后端(java)使用SpringBootRSocket,而在我的前端,我将使用angular使用RSocketJS。
我能够快速启动一个请求流交互模型,在这个模型中,我可以拉入系统中的所有通知。请参阅my backend的代码段:
@MessageMapping("streams")
public Flux<Notification> requestStream() {
log.info("Streaming to notifications...");
return streamEventService.retrieveAllNotifications().log();
}
现在在我的前端,我有以下代码片段:
export class RsocketClientService {
// backend ws endpoint
private readonly wsURL = 'ws://localhost:7000/notification';
client: any;
socket: any
constructor() {
this.client = new RSocketClient({
serializers: {
data: JsonSerializer,
metadata: IdentitySerializer
},
setup: {
keepAlive: 10000,
lifetime: 180000,
dataMimeType: 'application/json',
metadataMimeType: 'message/x.rsocket.routing.v0',
payload: {
data: 23
}
},
transport: new RSocketWebSocketClient({
url: this.wsURL
}),
responder: new EchoResponder()
});
}
public connect() {
console.log("initializeSocket...")
this.client.connect().subscribe({
onComplete: (socket: any) => {
this.socket = socket;
this.socket.connectionStatus().subscribe( (status: any) => {
console.log("Connection status? ", status);
});
},
onError: (error: any) => {
console.error("Connection onError? " + error);
},
onSubscribe: (cancel: any) => {
console.log("Connection onSubscribe? cancel?");
}
});
}
public retrieveNotifications() {
this.socket.requestStream({
data: null,
metadata: String.fromCharCode('streams'.length) + 'streams'
})
.subscribe({
onComplete: () => {
console.log("onComplete?");
},
onError: (error: any) => {
console.error("onError? error: " + error);
},
onNext: (payload: any) => {
console.log("onNext? payload: ", payload);
},
onSubscribe: (subscription: any) => {
console.log("onSubscribe?");
subscription.request(1000000);
},
});
}
我在ui中有一个按钮,如果单击它,它将调用retrievenotifications方法,该方法将订阅我的后端requeststream中的rsocket消息Map方法。
一切正常,我可以看到我的React。现在我的问题是,如果在我的服务器上有一个新的数据插入到数据库中,那么如何从后端服务器向前端发送一个通知消息,说“嘿!新的数据被推送到了数据库中,“我有点纠结于服务器如何能够使用一个不知何故的火而忘记客户端。
暂无答案!
目前还没有任何答案,快来回答吧!