我从akka streams开始,我想构建一个服务器作为stream来接收 Http.IncomingConnection
并将收到的信息作为普通接收器发送给Kafka。
我声明我的来源如下:
val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] =
Http().bind(interface = "localhost", port = "8080")
然后,我想从httprequest的主体中提取消息(字符串),最后将字符串发送给kafka。流程如下所示:
val bindingFuture: Future[Http.ServerBinding] = serverSource
.map(???) //Here, I need to extract the message
.map(message => new ProducerRecord[String, String](topic, message.result(2 seconds)))
.runWith(akka.kafka.scaladsl.Producer.plainSink(producerSettings))
但是,我不知道如何提取信息。我想这样做:
val requestHandler: HttpRequest => HttpResponse = {
case HttpRequest(POST, Uri.Path("/publish"), _, _, _) => {
HttpResponse(202, entity = "Message sent to Kafka!")
}
case r: HttpRequest =>
r.discardEntityBytes() // important to drain incoming HTTP Entity stream
HttpResponse(404, entity = "Unknown resource!")
}
但是,使用 connection handleWithSyncHandler requestHandler
我无法获取流进程所遵循的消息。同时,我也希望能收到你的任何请求 /publish
uri,或者在流内部返回404。
有可能这样做吗?
1条答案
按热度按时间4bbkushb1#
改用指令
路由dsl将比试图处理数据更容易使用
HttpRequest
手工操作:现在可以将其传入以处理传入的请求: