带nodejs的基于kafka的发布/订阅

nhn9ugyo  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(194)

我正在尝试用apachekafka开发一个pub/sub系统。restapi是用nodejs编写的。我正在使用的图书馆不是Kafka。用户传入要从中使用的队列的主题和偏移量(起始位置),api返回前10条消息。

consumer.init().then(function() {
  console.log("Consumer Ready");
});
app.get('/:topic/:off', function(req, res) {
console.log(req.params.off);
consumer.subscribe(req.params.topic, 0, {
  offset: req.params.off
},
function(messageSet, topic, partition) {
  var msg = "";
  messageSet.some(function(m) {
    msg += m.message.value.toString('utf8') + " ";
    if (parseInt(m.offset, 10) > parseInt(req.params.off, 10) + 10) {
      res.send("Results: " + req.params.off + "  " + msg);
      return true;
    }
  });
});
});

我正在使用hurl-it在线发出http请求。无论何时我提出请求,都需要450到2000毫秒的时间。这是一个很好的响应时间吗?如果没有,如何提高api的速度?
如果你需要其他信息,请告诉我。提前谢谢。
编辑:我的Kafka节点代码。

var Consumer = new kafka.Consumer;
var client1 = new kafka.Client;
var consumer = new Consumer(client1, [{
    topic: 'TutorialTopic',
    partition: 0,
    offset: 0
}], {
    autoCommit: true,
    fromOffset: true
});

app.get('/consume/:off', function(req, res) {
    console.log("In Consume");

    consumer.setOffset('TutorialTopic', 0, req.params.off);
    consumer.on('message', function(message) {
        console.log("Consumer Ready");
        console.log(message);
    });

    res.send("Consumed");

});

由于库提供了一个“on”事件处理程序,当两个或多个用户同时访问我的网站时,我如何识别?使用kafka节点从特定偏移量访问消息的最佳方式是什么?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题