我希望我的lagom订户专用服务订阅kafka主题并将消息流式传输到websocket。我有一个服务定义如下使用此文档(https://www.lagomframework.com/documentation/1.4.x/scala/messagebrokerapi.html#subscribe-作为指导方针:
// service call
def stream(): ServiceCall[Source[String, NotUsed], Source[String, NotUsed]]
// service implementation
override def stream() = ServiceCall { req =>
req.runForeach(str => log.info(s"client: %str"))
kafkaTopic().subscribe.atLeastOnce(Flow.fromFunction(
// add message to a Source and return Done
))
Future.successful(//some Source[String, NotUsed])
然而,我不太明白如何处理我的Kafka信息。这个 Flow.fromFunction
退货 [String, Done, _]
意味着我需要将这些消息(字符串)添加到在订阅服务器外部创建的源中。
所以我的问题有两个:1)如何创建一个akka流源来在运行时接收来自kafka主题订阅者的消息?2) 在流中,如何将kafka消息附加到所述源中?
1条答案
按热度按时间nafvub8i1#
你好像误解了lagom的服务api。如果您试图具体化服务调用主体中的流,那么您的调用没有任何输入;即。,
意味着当客户机提供
Source[String, NotUsed]
,服务将以实物形式响应。你的客户不是直接提供的;因此,你的签名应该是现在回答你的问题。。。
这实际上并不存在于scalagiter8模板中,但是java版本包含了一个他们称之为自治流的东西,它大致完成了您想要做的事情。
在scala中,这段代码看起来像。。。
由于您的调用不是Map到输入流,而是一个kafka主题,因此您需要执行以下操作: