我正在尝试使用akka队列来处理来自tcp连接的数据。起初,我把这种联系当作蒸汽的一部分,但它没有给我控制和反馈的灵活性。
不过,处理效果很好,但我无法让队列工作,即使有数据似乎也没有处理(也尝试了阻塞队列,并且可以看到队列填充,但没有处理)。它作为流的一部分工作,所以代码看起来还可以,我希望我需要以不同的方式执行它。
下面是我的代码:
private static class Qu16ConnectionClient extends AbstractActor {
final InetSocketAddress remote;
final ActorRef listener;
final SourceQueueWithComplete<ByteBuffer> sourceQueue;
private final BlockingQueue<String> nextCmd = new LinkedBlockingQueue<>();
public static Props props(InetSocketAddress remote, ActorRef listener) {
return Props.create(Qu16ConnectionClient.class, remote, listener);
}
public Qu16ConnectionClient(InetSocketAddress remote, ActorRef listener) {
this.remote = remote;
this.listener = listener;
final ActorRef tcp = Tcp.get(getContext().getSystem()).manager();
tcp.tell(TcpMessage.connect(remote), getSelf());
int bufferSize = 10;
sourceQueue = Source.<ByteBuffer>queue(bufferSize, OverflowStrategy.dropTail())
.map(input -> Hex.encodeHexString(input.array()))
.to(Sink.foreach(this::startProcessing))
.run(getContext().getSystem());
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(String.class, message -> {
Logger.debug("Incoming cmd: " + message);
nextCmd.put(message);
})
.match(
Tcp.CommandFailed.class,
msg -> {
listener.tell("failed", getSelf());
getContext().stop(getSelf());
})
.match(
Tcp.Connected.class,
msg -> {
listener.tell(msg, getSelf());
getSender().tell(TcpMessage.register(getSelf()), getSelf());
getContext().become(connected(getSender()));
})
.build();
}
private Receive connected(final ActorRef connection) {
return receiveBuilder()
.match(
ByteString.class,
msg -> {
Logger.debug("Received ByteString:" + Hex.encodeHexString(msg.toArray()));
connection.tell(TcpMessage.write((ByteString) msg), getSelf());
})
.match(
Tcp.CommandFailed.class,
msg -> {
Logger.warn("Command failed oon TCP");
})
.match(
Tcp.Received.class,
msg -> {
sourceQueue.offer(msg.data().asByteBuffer());
String hexCmd;
if (!nextCmd.isEmpty()) {
hexCmd = nextCmd.take();
} else {
hexCmd = "fe";
}
connection.tell(TcpMessage.write(ByteString.fromArray(Hex.decodeHex(hexCmd))), getSelf());
})
.matchEquals(
"close",
msg -> {
Logger.debug("Received close");
connection.tell(TcpMessage.close(), getSelf());
})
.match(
Tcp.ConnectionClosed.class,
msg -> {
Logger.debug("TCP Connection closed.");
getContext().stop(getSelf());
})
.build();
}
private void startProcessing(String fullResult) {
try {
Logger.debug("Receiving data: " + fullResult);
//Do the processing
} catch (Exception e) {
Logger.warn("Problem processing result: " + e.getMessage(), e);
}
}
}
暂无答案!
目前还没有任何答案,快来回答吧!