用服务器发送的事件重播kafka主题

roqulrg3  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(277)

我正在考虑下面的用例,并想验证这种方法在概念上是否有效。
目标是在spring中公开一个长时间运行的serversentedevent(sse)端点,为每个传入连接重播相同的kafka主题(使用一些特定于用户的过滤)。
苏格兰和南方能源公司是这样暴露的:

@GetMapping("/sse")
  public SseEmitter sse() {
    SseEmitter sseEmitter = new SseEmitter();

    Executors
          .newSingleThreadExecutor()
          .execute(() -> dummyDataProducer.generate()  // kafka ultimately
                .forEach(payload -> {
                  try {
                    sseEmitter.send(payload);
                  } catch (IOException ex) {
                    sseEmitter.completeWithError(ex);
                  }
                }));

    return sseEmitter;
  }

从另一边,有一个 KafkaListener 方法( ConcurrentKafkaListenerContainerFactory 已使用):

@KafkaListener(topics = "${app.kafka.topic1}")
  public void receive(
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer id,
        @Payload Object payload) {
    // do something ...
  }

据我所知,kafka消费者应用程序使用一个线程从单个主题读取数据。这在某种程度上违背了使用sse的想法,即为每个传入连接创建一个专用的长时间运行的线程。
对于这个用例,这是一种有效的方法吗?如果是这样,如何正确地完成这一点?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题