将kafka事件的消息发送到套接字

nfzehxib  于 2021-09-13  发布在  Java
关注(0)|答案(0)|浏览(337)

我正在尝试创建一个应用程序(在nodejs中)以将kafka事件流式传输到套接字连接。我使用的库是:kafkajs和socket.io
我可以通过以下方式对一个插座连接执行相同操作:

  1. const activeUsers = new Set();
  2. const kafka = new Kafka({
  3. clientId: 'BA-Webapp',
  4. brokers: ['XX.XX.XX.XX:9092']
  5. })
  6. const attentionConsumer = kafka.consumer({ groupId: 'XXX' })
  7. // App setup
  8. const PORT = 5000;
  9. const app = express();
  10. const server = app.listen(PORT, function () {
  11. console.log(`Listening on port ${PORT}`);
  12. console.log(`http://localhost:${PORT}`);
  13. });
  14. // Static files
  15. app.use(express.static("public"));
  16. // Socket setup
  17. const io = socket(server);
  18. io.on("connection", async function (socket) {
  19. console.log("Client connected")
  20. socket.on('start-stream', async clientUserId => {
  21. console.log("Client message received")
  22. socket.userId = clientUserId;
  23. activeUsers.add(clientUserId);
  24. console.log(activeUsers)
  25. await attentionConsumer.connect()
  26. await attentionConsumer.subscribe({ topic: 'on_message', fromBeginning: false })
  27. await attentionConsumer.run({
  28. eachMessage: async ({ topic, partition, message }) => {
  29. io.send(message.value.toString(), clientUserId)
  30. },
  31. })
  32. console.log(`Received message => ${message}`)
  33. })
  34. });

我在上面所做的是,每次客户端连接并发送消息开始流时,我都会创建一个消费者连接,订阅一个主题,每次主题收到消息时,我都会将其发送回客户端。
这适用于一个客户机,一旦第二个客户机连接,我就会得到预期的错误,即consumer已经在运行。
如何在套接字范围外启动使用者(以便在应用程序加载时启动),以及在套接字内接收任何消息流时启动使用者(仅当存在可用的套接字连接时)?
我是新手,任何帮助都将不胜感激。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题