akka TCP合并(concat)源流消息

5f0d552i  于 2023-01-30  发布在  其他
关注(0)|答案(1)|浏览(165)

我有有趣的问题与akka TCP流:
查看代码:

package snp.server;

import akka.Done;
import akka.NotUsed;
import akka.actor.ActorRef;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.Tcp;
import akka.stream.javadsl.Tcp.OutgoingConnection;
import akka.util.ByteString;
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.stream.Attributes;
import akka.stream.CompletionStrategy;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Framing;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

public class TCPClient_SinkSource1 {

    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {

        final ActorSystem system = ActorSystem.create("StreamTcpDocTest");

        final Flow<ByteString, ByteString, CompletionStage<OutgoingConnection>> connection
                = Tcp.get(system).outgoingConnection("192.168.62.130", 59090).map( f -> {
                     System.out.println("out1:" + f.utf8String());
                    return f;
                });

        final Sink<ByteString, CompletionStage<Done>> sink = Sink.foreach(f -> {
            System.out.println("from server:" + f.utf8String());
        });

        Source<ByteString, NotUsed> source = Source.range(1, 5).map(f -> ByteString.fromString(f.toString()))
                .throttle(1, Duration.ofMillis(30));

        
        Flow<ByteString, ByteString, NotUsed> clientFlow = Flow.fromSinkAndSource(sink, source).map( f -> {
                     System.out.println("out:" + f.utf8String());
                    return f;
                });     

        CompletionStage<OutgoingConnection> connectionCS = connection
         .join(clientFlow).run(system);

        connectionCS.whenComplete((d, e) -> {
            System.out.println("client conn: localAddress:" + d.localAddress()
                    + " remoteAddress:" + d.remoteAddress());
            System.out.println("e:" + e);
        });

    }
}

结果以某种方式连接,服务器回复例如为123 45。
当我增加到:* * 油门(1,持续时间毫秒(3000));**
服务器的回复是一个接一个的,正如我所期望的那样。有人能描述一下如何避免连接回复吗?

bpzcxfmw

bpzcxfmw1#

您不应将单个ByteString视为网络协议中的帧,而应使用一些实际的帧,并在使用端收集整个帧,单个ByteString可能最终会成为多个ByteString,或者在接收端反过来,这取决于发送系统和中间的网络。
正确的成帧可以由几个运营商开箱即用Akka Streams完成:https://doc.akka.io/docs/akka/current/stream/stream-io.html#using-framing-in-your-protocol
对于这个特定的例子,我想你可以看到Akka流TCP实现中的优化结果,它试图避免多个系统调用(它们是昂贵的),方法是在实际将其交给操作系统之前,在一个小的时间范围内批处理尽可能多的字节。

相关问题