Spring Boot 如何在RSocket服务器失败时恢复RSocket会话

kgsdhlau  于 2024-01-06  发布在  Spring
关注(0)|答案(2)|浏览(212)

好吧,我是RSocket的新手。我试图创建一个简单的RSocket客户端和简单的RSocket服务器。从我所做的研究来看,RSocket支持恢复:
它特别有用,因为当发送包含有关最后接收到的帧的信息的RESUME帧时,客户端能够恢复连接并仅请求它尚未接收到的数据,避免服务器上不必要的负载和浪费时间尝试检索已经检索的数据。
它还说客户端负责启用恢复。我的问题是如何启用这个恢复以及如何发送那个RESUME帧。我有功能正常的客户端和服务器,但是如果我关闭服务器并再次启动它,什么也没有发生,稍后当客户端再次尝试与服务器通信时,它会抛出:java.nio.channels.ClosedException.
这是我的客户端配置:

  1. @Configuration
  2. public class ClientConfiguration {
  3. /**
  4. * Defining the RSocket client to use tcp transport on port 7000
  5. */
  6. @Bean
  7. public RSocket rSocket() {
  8. return RSocketFactory
  9. .connect()
  10. .resumeSessionDuration(Duration.ofDays(10))
  11. .mimeType(MimeTypeUtils.APPLICATION_JSON_VALUE, MimeTypeUtils.APPLICATION_JSON_VALUE)
  12. .frameDecoder(PayloadDecoder.ZERO_COPY)
  13. .transport(TcpClientTransport.create(7000))
  14. .start()
  15. .block();
  16. }
  17. /**
  18. * RSocketRequester bean which is a wrapper around RSocket
  19. * and it is used to communicate with the RSocket server
  20. */
  21. @Bean
  22. RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
  23. return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, MimeTypeUtils.APPLICATION_JSON, rSocketStrategies);
  24. }

字符串
}
这是一个RestController,我从它开始与rsocket服务器通信:

  1. @RestController
  2. public class UserDataRestController {
  3. private final RSocketRequester rSocketRequester;
  4. public UserDataRestController(RSocketRequester.Builder rSocketRequester) {
  5. this.rSocketRequester = rSocketRequester.connectTcp("localhost", 7000).block();
  6. }
  7. @GetMapping(value = "/feed/{firstName}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  8. public Publisher<Person> feed(@PathVariable("firstName") String firstName) {
  9. return rSocketRequester
  10. .route("feedPersonData")
  11. .data(new PersonDataRequest(firstName))
  12. .retrieveFlux(Person.class);
  13. }
  14. }

wvyml7n5

wvyml7n51#

由于会话存储在内存中,因此在服务器重新启动后无法恢复。请参阅io.rsocket.resume.SessionManager#sessions
但是如果你重新连接到同一台服务器,你仍然可以保护自己免受网络问题的影响。而且你不必发送RESUME帧,客户端会为你做这件事。
您应该配置服务器:

  1. @Bean
  2. ServerRSocketFactoryProcessor serverRSocketFactoryProcessor() {
  3. return RSocketFactory.ServerRSocketFactory::resume;
  4. }

字符串
和客户端io.rsocket.RSocketFactory.ClientRSocketFactory#resume
您可以找到几乎完整的示例here

jhdbpxl9

jhdbpxl92#

由@ Alexandria 潘金提供的代码现在已被弃用。我使用此代码配置恢复一个服务器:

  1. @Bean
  2. RSocketServerCustomizer rSocketResume() {
  3. Resume resume =
  4. new Resume()
  5. .sessionDuration(Duration.ofMinutes(15))
  6. .retry(
  7. Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(5))
  8. .doBeforeRetry(s -> log.debug("Disconnected. Trying to resume...")));
  9. return rSocketServer -> rSocketServer.resume(resume);
  10. }

字符串

相关问题