带有datagramsockets的rxjava未接收消息

axzmvihb  于 2021-07-06  发布在  Java
关注(0)|答案(0)|浏览(226)

我用python创建了一个虚拟服务器,我想用java创建一个客户机,用rxjava实现它。首先,这是python中的服务器,它只回显接收到的消息:

import socket

server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_socket.bind(('', 12000))

while True:
    message, address = server_socket.recvfrom(2048)
    if (message):
        print(message.hex())
    server_socket.sendto(message, address)

所以我希望能发送信息并接收回来。对于客户端,我有3个类: Appcli -应用程序本身-运行应用程序 UdpObservable -实现一个可观察的-用于读取来自读取器的传入消息 UdpWriter -向服务器发送消息 Appcli -

public class Appcli {

    private static final int PORT_NO = 12000;

    private UdpWriter udpWriter;

    public void start() {
        udpWriter = new UdpWriter(PORT_NO);

        final Observable<DatagramPacket> udpObservable = UdpObservable.create(1024);

        disposableUdpData = udpObservable
                .observeOn(Schedulers.io())
                .subscribe(
                        new Consumer<DatagramPacket>() {
                            @Override
                            public void accept(DatagramPacket datagramPacket) throws Exception {
                                System.out.println("received datagram");
                                System.out.println(new String(datagramPacket.getData()));
                            }
                        }
                        , new Consumer<Throwable>() {
                            @Override
                            public void accept(Throwable throwable) throws Exception {
                                System.out.println("udp observable failed");
                                System.out.println("ERROR: " + throwable.getMessage());
                            }
                        }
                );

        try {
            udpWriter.sendBroadcast(hexStringToByteArray("TEST MESSAGE"));
        } catch (IOException e) {
            e.printStackTrace();
        }

        // test, used for some delay...
        for (int i = 0; i < 5; i++) {
            try {
                sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

现在是 UdpObservable -

public class UdpObservable {
    private static Cancellable getCancellable(final DatagramSocket udpSocket) {
        return new Cancellable() {
            @Override
            public void cancel() throws Exception {
                if (!udpSocket.isClosed()) {
                    udpSocket.close();
                }
            }
        };
    }

    /**
     * creates an Observable that will emit all UDP datagrams of a UDP port.
     * <p>
     * This will be an infinite stream that ends when the observer unsubscribes, or when an error
     * occurs.
     * </p>
     */
    public static Observable<DatagramPacket> create(final int bufferSizeInBytes) {
        return Observable.create(
                new ObservableOnSubscribe<DatagramPacket>() {
                    @Override
                    public void subscribe(ObservableEmitter<DatagramPacket> emitter) throws Exception {
                        System.out.println("subscribed!");
                        final DatagramSocket udpSocket = new DatagramSocket();
                        emitter.setCancellable(getCancellable(udpSocket));
                        //noinspection InfiniteLoopStatement
                        while (true) {
                            try {
                                byte[] rcvBuffer = new byte[bufferSizeInBytes];
                                DatagramPacket datagramPacket = new DatagramPacket(rcvBuffer, rcvBuffer.length);
                                udpSocket.receive(datagramPacket);
                                if (datagramPacket.getLength() == 0) {
                                    System.out.println("Read zero bytes");
                                } else {
                                    System.out.println(Arrays.toString(datagramPacket.getData()));
                                }
                                emitter.onNext(datagramPacket);
                            } catch (Exception e) {
                                emitter.onError(e);
                            }
                        }
                    }
                }).subscribeOn(Schedulers.io());
    }
}

以及 writer -

public class UdpWriter {

    private final int portNo;
    private DatagramSocket udpSocket;

    public UdpWriter(int portNo) {
        this.portNo = portNo;
    }

    public void sendBroadcast(final byte[] data) throws IOException {
        if (udpSocket == null) {
            udpSocket = new DatagramSocket();
        }

        Completable.fromAction(new Action() {
            @Override
            public void run() throws Exception {
                byte[] dataBytes = data;
                InetAddress udpSenderAddress = Inet4Address.getByName("127.0.0.1");
                DatagramPacket datagramPacket = new DatagramPacket(dataBytes, dataBytes.length, udpSenderAddress, portNo);
                udpSocket.send(datagramPacket);
            }
        }).subscribeOn(Schedulers.io()).subscribe();
    }

}

当我运行应用程序时,我得到 subscribed! 在客户机中,实际上在服务器端,我得到了消息的输出(这表明消息是在服务器端收到的)。但是,因为 UdpObservable 我还想打印来自服务器(第行)的传入数据 49 )我希望看到它也打印出来,但什么也没发生,它没有打印出来。
我想确保我的客户机得到消息,所以我在 writer 对此:

public void sendBroadcast(final byte[] data) throws IOException {
    if (udpSocket == null) {
        udpSocket = new DatagramSocket();
    }

    Completable.fromAction(new Action() {
        @Override
        public void run() throws Exception {
            byte[] dataBytes = data;
            InetAddress udpSenderAddress = Inet4Address.getByName("127.0.0.1");
            DatagramPacket datagramPacket = new DatagramPacket(dataBytes, dataBytes.length, udpSenderAddress, portNo);
            udpSocket.send(datagramPacket);
            byte[] rcvBuffer = new byte[1024];
            DatagramPacket res = new DatagramPacket(rcvBuffer, rcvBuffer.length);
            udpSocket.receive(res);
            if (res.getLength() == 0) {
                System.out.println("Read zero bytes");
            } else {
                System.out.println(Arrays.toString(res.getData()));
                System.out.println("that was the data");
            }
        }
    }).subscribeOn(Schedulers.io()).subscribe();
}

现在它打印来自服务器的传入消息。
所以我的问题是,为什么我在观察器中没有得到任何关于订阅方法的消息?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题