我正在尝试实现一个设置,我有多个浏览器打开一个websocket连接到我的akka http服务器,以便读取所有发布到kafka主题的消息。
所以消息流应该是这样的
kafka topic -> akka-http -> websocket connection 1
-> websocket connection 2
-> websocket connection 3
现在我已经为websocket创建了一个路径:
val route: Route =
path("ws") {
handleWebSocketMessages(notificationWs)
}
然后我为我的Kafka主题创建了一个消费者:
val consumerSettings = ConsumerSettings(system,
new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers("localhost:9092")
.withGroupId("group1")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val source = Consumer
.plainSource(consumerSettings, Subscriptions.topics("topic1"))
最后,我想将这个源代码连接到handlewebsocketmessages中的websocket
def handleWebSocketMessages: Flow[Message, Message, Any] =
Flow[Message].mapConcat {
case tm: TextMessage =>
TextMessage(source)::Nil
case bm: BinaryMessage =>
// ignore binary messages but drain content to avoid the stream being clogged
bm.dataStream.runWith(Sink.ignore)
Nil
}
这是我尝试使用 source
在短信中:
错误:(77,9)重载方法值应用于替代项:(textstream:akka.stream.scaladsl.source[string,any])akka.http.scaladsl.model.ws.textmessage(text:string)akka.http.scaladsl.model.ws.textmessage.strict不能应用于(akka.stream.scaladsl.source[org.apache.kafka.clients.consumer.consumerrecord[array[byte],string],akka.kafka.scaladsl.consumer.control])文本消息(来源::nil
我想我在这个过程中犯了很多错误,但我想说的是,最阻碍的部分是 handleWebSocketMessages
.
1条答案
按热度按时间egmofgnx1#
首先,要了解源的类型:
Source[ConsumerRecord[K, V], Control]
. 所以,你不能把它当作文本消息的参数。现在,让我们来看看websocket的观点:
为kafka源中的每条消息构建一条传出消息。消息将是来自Kafka消息的字符串转换的textmessage。
对于每个传入的消息,只需println()它
所以
Flow
可以看作是两个组成部分:Source
&那个Sink
.希望有帮助。