在网页上显示Kafka消息

qojgxg4l  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(576)

我有一个javaspring应用程序,带有一个tomcat服务器,可以监听kafka主题。我想在网页上以实时模式显示所有消息。因此,当Kafka消息到达后端时,我希望在我的网页上看到它。我不知道把Kafka的信息直接推到前端并显示在网页上的好方法。有没有人能帮我找到一个解决方案和一些能帮我的例子?谢谢!

cclgggtu

cclgggtu1#

我已经为我的上一个雇主用java实现了这样一个系统,尽管没有使用spring/tomcat。它正在消费来自Kafka的消息,并将它们提供给一个web套接字,以显示在浏览器中。我遵循的方法是使用akkastreamkafka和akkahttp来支持web套接字。这样做的好处是两者都基于akka流,这使得它很容易适合流数据。虽然可以将akkahttp嵌入到运行在tomcat中的spring应用程序中,但它可能不再是最自然的选择,因为spring框架已经对kafka和websockets提供了自己的支持。但是,如果您对这两种方法都不熟悉,那么使用akka方法可能是最简单的,其核心逻辑是这样的(我不能分享工作中的代码,所以我只是从文档中的示例中总结出来,没有经过测试):

public Route createRoute(ActorSystem system) {
  return path("ws", () -> {
    ConsumerSettings<byte[], String> consumerSettings = ConsumerSettings.create(system, new ByteArrayDeserializer(), new StringDeserializer())
      .withBootstrapServers("localhost:9092")
      .withGroupId(UUID.randomUUID().toString()) //this is so that each client gets all messages. To be able to resume from where a client left off in case of disconnects, you can generate in on the client side and pass in the request
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

    return handleWebSocketMessages(
      Flow.fromSinkAndSourceCoupled(
        Sink.ignore(),
        Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
          .map(msg -> TextMessage.create(msg.record().value()))
      )
    );
  }
}

要公开此路由,您可以遵循简约示例,唯一的区别是您定义的路由需要actorsystem:

final Http http = Http.get(system);
final ActorMaterializer materializer = ActorMaterializer.create(system);

final Flow<HttpRequest, HttpResponse, NotUsed> routeFlow = createRoute(system).flow(system, materializer);
final CompletionStage<ServerBinding> binding = http.bindAndHandle(routeFlow,
    ConnectHttp.toHost("localhost", 8080), materializer);

一旦您将消息发布到websocket,前端代码当然将取决于您选择的ui框架,从javascript使用ws消息的最简单代码是:

this.connection = new WebSocket('ws://url-to-your-ws-endpoint');
this.connection.onmessage = evt => { 
  // display the message

为了方便地在ui中显示消息,您希望格式是方便的,比如json。如果您的kafka消息已经不是json了,那么第一个代码段中的反序列化程序就是从这里来的,您可以在反序列化程序中将其转换为一个方便的json字符串,或者稍后在源对象上调用的.map()中进行转换。
或者,如果轮询是一个选项,您也可以考虑使用现成的kafka rest代理,那么您只需要构建前端。

相关问题