如何向akka队列注入数据

83qze16e  于 2021-07-06  发布在  Java
关注(0)|答案(0)|浏览(195)

我正在尝试使用akka队列来处理来自tcp连接的数据。起初,我把这种联系当作蒸汽的一部分,但它没有给我控制和反馈的灵活性。
不过,处理效果很好,但我无法让队列工作,即使有数据似乎也没有处理(也尝试了阻塞队列,并且可以看到队列填充,但没有处理)。它作为流的一部分工作,所以代码看起来还可以,我希望我需要以不同的方式执行它。
下面是我的代码:

private static class Qu16ConnectionClient extends AbstractActor {
        final InetSocketAddress remote;
        final ActorRef listener;

        final SourceQueueWithComplete<ByteBuffer> sourceQueue;      
        private final BlockingQueue<String> nextCmd = new LinkedBlockingQueue<>();   

        public static Props props(InetSocketAddress remote, ActorRef listener) {
            return Props.create(Qu16ConnectionClient.class, remote, listener);
        }

        public Qu16ConnectionClient(InetSocketAddress remote, ActorRef listener) {
            this.remote = remote;
            this.listener = listener;

            final ActorRef tcp = Tcp.get(getContext().getSystem()).manager();
            tcp.tell(TcpMessage.connect(remote), getSelf());

            int bufferSize = 10;
            sourceQueue = Source.<ByteBuffer>queue(bufferSize, OverflowStrategy.dropTail())
                    .map(input -> Hex.encodeHexString(input.array()))
                    .to(Sink.foreach(this::startProcessing))
                    .run(getContext().getSystem());
        }

        @Override
        public Receive createReceive() {
            return receiveBuilder()
                    .match(String.class, message -> {
                        Logger.debug("Incoming cmd: " + message);
                        nextCmd.put(message);
                    })
                    .match(
                            Tcp.CommandFailed.class,
                            msg -> {
                                listener.tell("failed", getSelf());
                                getContext().stop(getSelf());
                            })
                    .match(
                            Tcp.Connected.class,
                            msg -> {
                                listener.tell(msg, getSelf());
                                getSender().tell(TcpMessage.register(getSelf()), getSelf());
                                getContext().become(connected(getSender()));
                            })
                    .build();
        }

        private Receive connected(final ActorRef connection) {
            return receiveBuilder()
                    .match(
                            ByteString.class,
                            msg -> {
                                Logger.debug("Received ByteString:" + Hex.encodeHexString(msg.toArray()));
                                connection.tell(TcpMessage.write((ByteString) msg), getSelf());
                            })
                    .match(
                            Tcp.CommandFailed.class,
                            msg -> {
                                Logger.warn("Command failed oon TCP");
                            })
                    .match(
                            Tcp.Received.class,
                            msg -> {
                                sourceQueue.offer(msg.data().asByteBuffer());
                                String hexCmd;
                                if (!nextCmd.isEmpty()) {
                                    hexCmd = nextCmd.take();
                                } else {
                                    hexCmd = "fe";
                                }                                                 
                                connection.tell(TcpMessage.write(ByteString.fromArray(Hex.decodeHex(hexCmd))), getSelf());
                            })
                    .matchEquals(
                            "close",
                            msg -> {
                                Logger.debug("Received close");
                                connection.tell(TcpMessage.close(), getSelf());
                            })
                    .match(
                            Tcp.ConnectionClosed.class,
                            msg -> {
                                Logger.debug("TCP Connection closed.");
                                getContext().stop(getSelf());
                            })
                    .build();
        }

        private void startProcessing(String fullResult) {
            try {
                Logger.debug("Receiving data: " + fullResult);
                //Do the processing
            } catch (Exception e) {
                Logger.warn("Problem processing result: " + e.getMessage(), e);
            }
        }
    }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题