kafka主题到websocket

ui7jx7zq  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(434)

我正在尝试实现一个设置,我有多个浏览器打开一个websocket连接到我的akka http服务器,以便读取所有发布到kafka主题的消息。
所以消息流应该是这样的

kafka topic -> akka-http -> websocket connection 1 
                         -> websocket connection 2
                         -> websocket connection 3

现在我已经为websocket创建了一个路径:

val route: Route = 
 path("ws") {
   handleWebSocketMessages(notificationWs)
 }

然后我为我的Kafka主题创建了一个消费者:

val consumerSettings = ConsumerSettings(system,
  new ByteArrayDeserializer, new StringDeserializer)
    .withBootstrapServers("localhost:9092")
    .withGroupId("group1")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val source = Consumer
  .plainSource(consumerSettings, Subscriptions.topics("topic1"))

最后,我想将这个源代码连接到handlewebsocketmessages中的websocket

def handleWebSocketMessages: Flow[Message, Message, Any] =
  Flow[Message].mapConcat {
    case tm: TextMessage =>
      TextMessage(source)::Nil
    case bm: BinaryMessage =>
      // ignore binary messages but drain content to avoid the stream being clogged
      bm.dataStream.runWith(Sink.ignore)
      Nil
  }

这是我尝试使用 source 在短信中:
错误:(77,9)重载方法值应用于替代项:(textstream:akka.stream.scaladsl.source[string,any])akka.http.scaladsl.model.ws.textmessage(text:string)akka.http.scaladsl.model.ws.textmessage.strict不能应用于(akka.stream.scaladsl.source[org.apache.kafka.clients.consumer.consumerrecord[array[byte],string],akka.kafka.scaladsl.consumer.control])文本消息(来源::nil
我想我在这个过程中犯了很多错误,但我想说的是,最阻碍的部分是 handleWebSocketMessages .

egmofgnx

egmofgnx1#

首先,要了解源的类型: Source[ConsumerRecord[K, V], Control] . 所以,你不能把它当作文本消息的参数。
现在,让我们来看看websocket的观点:
为kafka源中的每条消息构建一条传出消息。消息将是来自Kafka消息的字符串转换的textmessage。
对于每个传入的消息,只需println()它
所以 Flow 可以看作是两个组成部分: Source &那个 Sink .

val incomingMessages: Sink[Message, NotUsed] =
  Sink.foreach(println(_))

val outgoingMessages: Source[Message, NotUsed] =
  source
    .map { consumerRecord => TextMessage(consumerRecord.record.value) }

val handleWebSocketMessages: Flow[Message, Message, Any]  
  = Flow.fromSinkAndSource(incomingMessages, outgoingMessages)

希望有帮助。

相关问题