一个Tcp.OuttgoingConnection从一个音频混音器收集数据,并异步发送到一个sourceQueue,后者处理数据。
发出命令后,不能保证下一位数据就是响应,如何反馈响应?
一个“脏”的方法是有一个静态变量,当处理线程暂停等待它时,我把数据放在其中,但这是非常低效的。有没有一个akka机制可以监视值的变化并给予一个未来?
这是当前代码:
public Q16SocketThread(ActorSystem system) {
Logger.debug("Loading Q16SocketThread.");
this.system = system;
final Flow<ByteString, ByteString, CompletionStage<Tcp.OutgoingConnection>> connection =
Tcp.get(system).outgoingConnection(ipAddress, port);
int bufferSize = 10;
final SourceQueueWithComplete<ByteBuffer> sourceQueue =
Source.<ByteBuffer>queue(bufferSize, OverflowStrategy.fail())
.map(input -> Hex.encodeHexString(input.array()))
.to(Sink.foreach(this::startProcessing))
.run(system);
final Flow<ByteString, ByteString, NotUsed> repl =
Flow.of(ByteString.class)
.map(ByteString::toByteBuffer)
.map(sourceQueue::offer)
.map(
text -> {
//Logger.debug("Server: " + Hex.encodeHexString(text.array()));
String hexCmd;
if (!nextCmd.isEmpty()) {
hexCmd = nextCmd.take();
} else {
hexCmd = "fe";
}
return ByteString.fromArray(Hex.decodeHex(hexCmd));
}).async();
CompletionStage<Tcp.OutgoingConnection> connectionCS = connection.join(repl).run(system);
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(String.class, message -> {
if (message.equalsIgnoreCase("start")) {
Logger.debug("Q16 thread started.");
nextCmd.put(sysExHeaderAllCall + "1201F7");
} else if (message.equalsIgnoreCase("stop")) {
Logger.debug("Stopping of data gathering");
nextCmd.put(sysExHeaderAllCall + "1200F7");
//self().tell(PoisonPill.getInstance(), ActorRef.noSender());
} else if (message.equalsIgnoreCase("version")){
Logger.debug("Requesting version.");
nextCmd.put(sysExHeaderAllCall + "1001F7");
}
}).build();
}
1条答案
按热度按时间rqdpfwrv1#
我理解为观察一个变量使用询问模式并接收一条消息。在你的例子中,你想把消息 Package 在一个未来中。这是你的意思吗?
如果是这样的话,Akka文档(https://doc.akka.io/docs/akka/2.5/futures.html#use-with-actors)中的这段内容可能会有所帮助:
通常有两种方法可以获得AbstractActor的回复:第一种是通过发送的消息(actorRef.tell(msg,sender)),仅当原始发送者是AbstractActor时才起作用),第二种是通过Future。
使用ActorRef的ask方法发送消息将返回一个Future。要等待并检索实际结果,最简单的方法是: