我用python创建了一个虚拟服务器,我想用java创建一个客户机,用rxjava实现它。首先,这是python中的服务器,它只回显接收到的消息:
import socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_socket.bind(('', 12000))
while True:
message, address = server_socket.recvfrom(2048)
if (message):
print(message.hex())
server_socket.sendto(message, address)
所以我希望能发送信息并接收回来。对于客户端,我有3个类: Appcli
-应用程序本身-运行应用程序 UdpObservable
-实现一个可观察的-用于读取来自读取器的传入消息 UdpWriter
-向服务器发送消息 Appcli
-
public class Appcli {
private static final int PORT_NO = 12000;
private UdpWriter udpWriter;
public void start() {
udpWriter = new UdpWriter(PORT_NO);
final Observable<DatagramPacket> udpObservable = UdpObservable.create(1024);
disposableUdpData = udpObservable
.observeOn(Schedulers.io())
.subscribe(
new Consumer<DatagramPacket>() {
@Override
public void accept(DatagramPacket datagramPacket) throws Exception {
System.out.println("received datagram");
System.out.println(new String(datagramPacket.getData()));
}
}
, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println("udp observable failed");
System.out.println("ERROR: " + throwable.getMessage());
}
}
);
try {
udpWriter.sendBroadcast(hexStringToByteArray("TEST MESSAGE"));
} catch (IOException e) {
e.printStackTrace();
}
// test, used for some delay...
for (int i = 0; i < 5; i++) {
try {
sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
现在是 UdpObservable
-
public class UdpObservable {
private static Cancellable getCancellable(final DatagramSocket udpSocket) {
return new Cancellable() {
@Override
public void cancel() throws Exception {
if (!udpSocket.isClosed()) {
udpSocket.close();
}
}
};
}
/**
* creates an Observable that will emit all UDP datagrams of a UDP port.
* <p>
* This will be an infinite stream that ends when the observer unsubscribes, or when an error
* occurs.
* </p>
*/
public static Observable<DatagramPacket> create(final int bufferSizeInBytes) {
return Observable.create(
new ObservableOnSubscribe<DatagramPacket>() {
@Override
public void subscribe(ObservableEmitter<DatagramPacket> emitter) throws Exception {
System.out.println("subscribed!");
final DatagramSocket udpSocket = new DatagramSocket();
emitter.setCancellable(getCancellable(udpSocket));
//noinspection InfiniteLoopStatement
while (true) {
try {
byte[] rcvBuffer = new byte[bufferSizeInBytes];
DatagramPacket datagramPacket = new DatagramPacket(rcvBuffer, rcvBuffer.length);
udpSocket.receive(datagramPacket);
if (datagramPacket.getLength() == 0) {
System.out.println("Read zero bytes");
} else {
System.out.println(Arrays.toString(datagramPacket.getData()));
}
emitter.onNext(datagramPacket);
} catch (Exception e) {
emitter.onError(e);
}
}
}
}).subscribeOn(Schedulers.io());
}
}
以及 writer
-
public class UdpWriter {
private final int portNo;
private DatagramSocket udpSocket;
public UdpWriter(int portNo) {
this.portNo = portNo;
}
public void sendBroadcast(final byte[] data) throws IOException {
if (udpSocket == null) {
udpSocket = new DatagramSocket();
}
Completable.fromAction(new Action() {
@Override
public void run() throws Exception {
byte[] dataBytes = data;
InetAddress udpSenderAddress = Inet4Address.getByName("127.0.0.1");
DatagramPacket datagramPacket = new DatagramPacket(dataBytes, dataBytes.length, udpSenderAddress, portNo);
udpSocket.send(datagramPacket);
}
}).subscribeOn(Schedulers.io()).subscribe();
}
}
当我运行应用程序时,我得到 subscribed!
在客户机中,实际上在服务器端,我得到了消息的输出(这表明消息是在服务器端收到的)。但是,因为 UdpObservable
我还想打印来自服务器(第行)的传入数据 49
)我希望看到它也打印出来,但什么也没发生,它没有打印出来。
我想确保我的客户机得到消息,所以我在 writer
对此:
public void sendBroadcast(final byte[] data) throws IOException {
if (udpSocket == null) {
udpSocket = new DatagramSocket();
}
Completable.fromAction(new Action() {
@Override
public void run() throws Exception {
byte[] dataBytes = data;
InetAddress udpSenderAddress = Inet4Address.getByName("127.0.0.1");
DatagramPacket datagramPacket = new DatagramPacket(dataBytes, dataBytes.length, udpSenderAddress, portNo);
udpSocket.send(datagramPacket);
byte[] rcvBuffer = new byte[1024];
DatagramPacket res = new DatagramPacket(rcvBuffer, rcvBuffer.length);
udpSocket.receive(res);
if (res.getLength() == 0) {
System.out.println("Read zero bytes");
} else {
System.out.println(Arrays.toString(res.getData()));
System.out.println("that was the data");
}
}
}).subscribeOn(Schedulers.io()).subscribe();
}
现在它打印来自服务器的传入消息。
所以我的问题是,为什么我在观察器中没有得到任何关于订阅方法的消息?
暂无答案!
目前还没有任何答案,快来回答吧!