使用TCP流并将其重定向到另一个接收器(使用Akka流)

krcsximq  于 2022-11-05  发布在  其他
关注(0)|答案(2)|浏览(235)

我尝试用Akka 2.4.3将TCP流重定向/转发到另一个Sink。程序应该打开一个服务器套接字,监听传入的连接,然后使用TCP流。我们的发送方不期望/接受来自我们的回复,所以我们从不发回任何东西-我们只是使用流。在构造TCP流后,我们需要将字节转换为更有用的东西,并将其发送到Sink。
到目前为止,我尝试了以下方法,但我特别纠结于如何不将TCP包发送回发送方并正确连接接收器。

  1. import scala.util.Failure
  2. import scala.util.Success
  3. import akka.actor.ActorSystem
  4. import akka.event.Logging
  5. import akka.stream.ActorMaterializer
  6. import akka.stream.scaladsl.Sink
  7. import akka.stream.scaladsl.Tcp
  8. import akka.stream.scaladsl.Framing
  9. import akka.util.ByteString
  10. import java.nio.ByteOrder
  11. import akka.stream.scaladsl.Flow
  12. object TcpConsumeOnlyStreamToSink {
  13. implicit val system = ActorSystem("stream-system")
  14. private val log = Logging(system, getClass.getName)
  15. //The Sink
  16. //In reality this is of course a real Sink doing some useful things :-)
  17. //The Sink accept types of "SomethingMySinkUnderstand"
  18. val mySink = Sink.ignore;
  19. def main(args: Array[String]): Unit = {
  20. //our sender is not interested in getting replies from us
  21. //so we just want to consume the tcp stream and never send back anything to the sender
  22. val (address, port) = ("127.0.0.1", 6000)
  23. server(system, address, port)
  24. }
  25. def server(system: ActorSystem, address: String, port: Int): Unit = {
  26. implicit val sys = system
  27. import system.dispatcher
  28. implicit val materializer = ActorMaterializer()
  29. val handler = Sink.foreach[Tcp.IncomingConnection] { conn =>
  30. println("Client connected from: " + conn.remoteAddress)
  31. conn handleWith Flow[ByteString]
  32. //this is neccessary since we use a self developed tcp wire protocol
  33. .via(Framing.lengthField(4, 0, 65532, ByteOrder.BIG_ENDIAN))
  34. //here we want to map the raw bytes into something our Sink understands
  35. .map(msg => new SomethingMySinkUnderstand(msg.utf8String))
  36. //here we like to connect our Sink to the Tcp Source
  37. .to(mySink) //<------ NOT COMPILING
  38. }
  39. val tcpSource = Tcp().bind(address, port)
  40. val binding = tcpSource.to(handler).run()
  41. binding.onComplete {
  42. case Success(b) =>
  43. println("Server started, listening on: " + b.localAddress)
  44. case Failure(e) =>
  45. println(s"Server could not bind to $address:$port: ${e.getMessage}")
  46. system.terminate()
  47. }
  48. }
  49. class SomethingMySinkUnderstand(x:String) {
  50. }
  51. }

更新:将其添加到您的build.sbt文件中,以获得必要的deps

  1. libraryDependencies += "com.typesafe.akka" % "akka-stream_2.11" % "2.4.3"
zyfwsgd6

zyfwsgd61#

handleWith需要一个Flow,即一个盒子有一个未连接的入口和一个未连接的出口。您实际上提供了一个Source,因为您通过使用to操作将FlowSink连接起来。
我认为您可以执行以下操作:

  1. conn.handleWith(
  2. Flow[ByteString]
  3. .via(Framing.lengthField(4, 0, 65532, ByteOrder.BIG_ENDIAN))
  4. .map(msg => new SomethingMySinkUnderstand(msg.utf8String))
  5. .alsoTo(mySink)
  6. .map(_ => ByteString.empty)
  7. .filter(_ => false) // Prevents sending anything back
  8. )
kxe2p93d

kxe2p93d2#

另一种(在我看来更干净)的编码方式(AKKA 2.6.x),这也将强调一个事实,即你不做任何出站流,将:

  1. val receivingPipeline = Flow
  2. .via(framing)
  3. .via(decoder)
  4. .to(mySink)
  5. val sendingNothing = Source.never[ByteString]()
  6. conn.handleWith(Flow.fromSinkAndSourceCoupled(receivingPiline, sendingNothing))

相关问题