WebSocket连接尝试未因Flux超时而中断

7kqas0il  于 12个月前  发布在  其他
关注(0)|答案(2)|浏览(123)

我在使用Websaut 3.9.0中的WebSocket连接时遇到问题。我正在尝试为WebSocket连接设置超时,尽管我进行了努力,超时似乎没有按预期中断连接尝试。
我尝试在Flux对象上使用超时操作符,但它似乎并不像我想象的那样工作。连接尝试继续超过指定的超时。
示例代码:

package com.example;

import io.micronaut.context.BeanContext;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import io.micronaut.websocket.WebSocketClient;
import io.micronaut.websocket.WebSocketSession;
import io.micronaut.websocket.annotation.ClientWebSocket;
import io.micronaut.websocket.annotation.OnClose;
import io.micronaut.websocket.annotation.OnMessage;
import io.micronaut.websocket.annotation.OnOpen;
import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Assertions;

import jakarta.inject.Inject;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;

@Slf4j
@MicronautTest
class WstimeoutTest {
    static AtomicBoolean isOpened = new AtomicBoolean(false);

    @Inject
    BeanContext beanContext;

    @ClientWebSocket
    static abstract class TestWebSocketClient implements AutoCloseable {
        @OnMessage
        void onMessage(String message) {
            log.info("[onMessage] Got message: {}", message);
        }

        @OnOpen
        void onOpen(WebSocketSession session) {
            log.info("[onOpen] WS session id: {}", session.getId());
            isOpened.set(true);
        }

        @OnClose
        void onClose(WebSocketSession session) {
            log.info("[onClose] WS session id: {}", session.getId());
        }
    }

    @Test
    void testWebSocketTimeout() {
        var uri = "ws://localhost";
        var timeout = Duration.ofMillis(40); //Put here the time that ws client is unable to establish (or emit OnOpen event) websocket connection

        var webSocketClient = beanContext.getBean(WebSocketClient.class);

        Publisher<TestWebSocketClient> client = webSocketClient.connect(TestWebSocketClient.class,  uri);
        Flux.from(client)
                .timeout(timeout)
                .doOnError(throwable -> log.info("Expected error: {}", throwable.getMessage()))
                .subscribe();

        Awaitility.await().atLeast(Duration.ofMillis(250)).untilAsserted(
                () -> Assertions.assertFalse(isOpened.get(), "WebSocket should not be opened")
        );
    }
}

字符串
测试输出:

2023:11:21T13:24:56.140 [parallel-1] INFO  com.example.WstimeoutTest[][] - Expected error: Did not observe any item or terminal signal within 40ms in 'switchMapNoPrefetch' (and no fallback has been configured)
2023:11:21T13:24:56.178 [default-nioEventLoopGroup-1-2] INFO com.example.WstimeoutTest[][] - [onOpen] WS session id: AOKppbvUUl5zc3/x1N0lBiXW4WU=

WebSocket should not be opened
Expected :false
Actual   :true


有什么方法可以正确地使WebSocket连接超时吗?

pbpqsu0x

pbpqsu0x1#

您正在AssertonClose()方法中未更改的值:

package com.example;

import io.micronaut.context.BeanContext;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import io.micronaut.websocket.WebSocketClient;
import io.micronaut.websocket.WebSocketSession;
import io.micronaut.websocket.annotation.ClientWebSocket;
import io.micronaut.websocket.annotation.OnClose;
import io.micronaut.websocket.annotation.OnMessage;
import io.micronaut.websocket.annotation.OnOpen;
import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Assertions;

import jakarta.inject.Inject;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;

@Slf4j
@MicronautTest
class WstimeoutTest {
    static AtomicBoolean isOpened = new AtomicBoolean(false);

    @Inject
    BeanContext beanContext;

    @ClientWebSocket
    static abstract class TestWebSocketClient implements AutoCloseable {
        @OnMessage
        void onMessage(String message) {
            log.info("[onMessage] Got message: {}", message);
        }

        @OnOpen
        void onOpen(WebSocketSession session) {
            log.info("[onOpen] WS session id: {}", session.getId());
            isOpened.set(true);
        }

        @OnClose
        void onClose(WebSocketSession session) {
            log.info("[onClose] WS session id: {}", session.getId());
            isOpened.set(false);
        }
    }

    @Test
    void testWebSocketTimeout() {
        var uri = "ws://localhost";
        var timeout = Duration.ofMillis(40); //Put here the time that ws client is unable to establish (or emit OnOpen event) websocket connection

        var webSocketClient = beanContext.getBean(WebSocketClient.class);

        Publisher<TestWebSocketClient> client = webSocketClient.connect(TestWebSocketClient.class,  uri);
        Flux.from(client)
                .timeout(timeout)
                .doOnError(throwable -> log.info("Expected error: {}", throwable.getMessage()))
                .subscribe();

        Awaitility.await().atLeast(Duration.ofMillis(250)).untilAsserted(
                () -> Assertions.assertFalse(isOpened.get(), "WebSocket should not be opened")
        );
    }
}

字符串

olhwl3o2

olhwl3o22#

我相信我已经找到了一个解决方案后,研究和实验HttpClientConfiguration .该解决方案涉及设置ConnectTimeout的HttpClient:)
此外,在连接到localhost的情况下,时间限制可能会被忽略,因为通信发生在相同的环境中(?)(至少下面的代码失败,即使我为本地服务器设置httpTimeout = Duration.ofNanos(1))。为了测试这一点,我使用了在线echo WebSocket服务器。下面是完整的示例代码沿着我的结果:

package com.example;

import io.micronaut.context.BeanContext;
import io.micronaut.http.client.HttpClientConfiguration;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import io.micronaut.websocket.CloseReason;
import io.micronaut.websocket.WebSocketClient;
import io.micronaut.websocket.WebSocketSession;
import io.micronaut.websocket.annotation.ClientWebSocket;
import io.micronaut.websocket.annotation.OnClose;
import io.micronaut.websocket.annotation.OnError;
import io.micronaut.websocket.annotation.OnMessage;
import io.micronaut.websocket.annotation.OnOpen;
import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Assertions;

import jakarta.inject.Inject;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

@Slf4j
@MicronautTest
class WstimeoutTest {
    static AtomicBoolean isOpened = new AtomicBoolean(false);
    static AtomicBoolean isCompleted = new AtomicBoolean(false);
    static AtomicReference<CloseReason> closeReasonAtomicReference = new AtomicReference<>();

    @Inject
    BeanContext beanContext;

    @ClientWebSocket
    static abstract class TestWebSocketClient implements AutoCloseable {
        @OnMessage
        void onMessage(String message, WebSocketSession session) {
            log.info("[onMessage] Got message: {}", message);
            session.close(CloseReason.NORMAL);
        }

        @OnOpen
        void onOpen(WebSocketSession session) {
            log.info("[onOpen] WS session id: {}", session.getId());
            isOpened.set(true);
            session.close(CloseReason.NORMAL);
        }

        @OnError
        void onError(Throwable err, WebSocketSession session) {
            log.error("[onError] WS session id: {}, with error: {}", session.getId(), err.getMessage());
        }

        @OnClose
        void onClose(CloseReason closeReason, WebSocketSession session) {
            log.info("[onClose] WS session id: {}, close code: {}", session.getId(), closeReason.getCode());
            closeReasonAtomicReference.set(closeReason);
        }
    }

    @Test
    void testWebSocketTimeout() {
        URI uri = null;
        try {
            uri = new URI("<provide address to external WS server>");
        } catch (URISyntaxException e) {
            throw new RuntimeException(e);
        }
        var fluxTimeout = Duration.ofMillis(2000);
//        var httpTimeout = Duration.ofMillis(400); // within this time client is able to establish connection
        var httpTimeout = Duration.ofMillis(30); // within this time the ws client is unable to establish websocket connection and ends/throws close code 1006

        var httpClientConfiguration = beanContext.getBean(HttpClientConfiguration.class);

        httpClientConfiguration.setConnectTimeout(httpTimeout);

        var webSocketClient = beanContext.getBean(WebSocketClient.class);

        Publisher<TestWebSocketClient> client = webSocketClient.connect(TestWebSocketClient.class, uri);

        var disposable = Flux.from(client)
                .timeout(fluxTimeout)
                .doFinally(unused -> isCompleted.set(true))
                .subscribe(
                        testWebSocketClient -> {
                            isOpened.set(true);
                            log.info("Success");
                        },
                        throwable -> {
                            isOpened.set(false);
                            log.info("Expected error: {}", throwable.getMessage(), throwable);
                        }
                );

        Awaitility.await().atMost(fluxTimeout).pollInterval(Duration.ofMillis(50))
                .untilAsserted(
                        () -> Assertions.assertTrue(isCompleted.get(), "WebSocket should be completed")
                );

        Awaitility.await().atMost(fluxTimeout).pollInterval(Duration.ofMillis(50))
                .untilAsserted(
                () -> Assertions.assertFalse(isOpened.get(), "WebSocket should not be opened")
        );
        Awaitility.await().atMost(fluxTimeout).pollInterval(Duration.ofMillis(50))
                .untilAsserted(
                () -> Assertions.assertNotEquals(CloseReason.NORMAL, closeReasonAtomicReference.get(),
                        "WebSocket should not be opened")
        );

        disposable.dispose();
    }
}

字符串
输出量:

> Task :testClasses
2023:12:02T14:54:12.335 [Test worker] INFO  i.m.c.DefaultApplicationContext$RuntimeConfiguredEnvironment[][] - Established active environments: [test]
2023:12:02T14:54:13.286 [default-nioEventLoopGroup-1-2] INFO  com.example.WstimeoutTest[][] - Expected error: Error opening WebSocket client session: Abnormal Closure
io.micronaut.websocket.exceptions.WebSocketClientException: Error opening WebSocket client session: Abnormal Closure
    at io.micronaut.http.client.netty.websocket.NettyWebSocketClientHandler.handleCloseReason(NettyWebSocketClientHandler.java:262)
    (...)
> Task :test
BUILD SUCCESSFUL in 4s


似乎没有特定的TimeoutException,而是一个WebSocketClientException,其中包含有关Abnormal Closure的信息。此外,没有触发WS事件,如预期的onOpenonMessageonCloseonError

相关问题