我面临的挑战是,一个现有的web服务返回一个调用2条消息,但具有不同的结构/类型。
如何在 Spring 使用webflux reactornettywebsocketclient处理此问题?i、 e.如何连接两个不同的Map器或通过多个订户处理它?
这里假设这两条响应消息的顺序总是相同的,因此第一条消息将由“info”Map器解析,第二条消息由“auth”Map器解析。
第二条消息是订户的信息。然后,服务器将继续在通道“0”上发送多条消息。当我收到第二条信息时,我如何向我的客户订阅这些信息?
客户:
WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute(uri, session -> session.send(Mono.just(session.textMessage(authCommand)))
.thenMany(session.receive().take(2).doOnNext(message -> {
System.out.println(message.getPayloadAsText());
})).then()).block();
第一次从服务器返回:
{"event":"info","version":2,"serverId":"20df78274661","platform":{"status":1}}
{"event":"auth","status":"OK","chanId":0,"userId":2256812}
在服务器的两条消息之后发送的通道消息:
[0,"wu",["funding","USD",11041.78713953,0,8.090876180000123,null,null]]
[0,"wu",["funding","BTC",0.25495514,0,4.000000003445692e-8,null,null]]
[0,"wu",["funding","EUR",2139.03965717,0,0.00965717000008226,null,null]]
感谢您提前提出任何建议和想法。
1条答案
按热度按时间abithluo1#
在研究了webflux和reactor的一些资料之后,我采用了以下方法。我使用的处理器,你可以连接多个订户。然后将此处理器连接到websocketclient中。
关于不同处理器的一个很好的总结可以在这里找到
如何在java的reactor中使用处理器
使用这种方法,我将能够动态添加订户。因此,当我检测到频道消息时,我可以使用过滤器为该频道号向处理器添加一个新的订户。下面的伪代码说明了这种行为。