我正在尝试创建一个应用程序(在nodejs中)以将kafka事件流式传输到套接字连接。我使用的库是:kafkajs和socket.io
我可以通过以下方式对一个插座连接执行相同操作:
const activeUsers = new Set();
const kafka = new Kafka({
clientId: 'BA-Webapp',
brokers: ['XX.XX.XX.XX:9092']
})
const attentionConsumer = kafka.consumer({ groupId: 'XXX' })
// App setup
const PORT = 5000;
const app = express();
const server = app.listen(PORT, function () {
console.log(`Listening on port ${PORT}`);
console.log(`http://localhost:${PORT}`);
});
// Static files
app.use(express.static("public"));
// Socket setup
const io = socket(server);
io.on("connection", async function (socket) {
console.log("Client connected")
socket.on('start-stream', async clientUserId => {
console.log("Client message received")
socket.userId = clientUserId;
activeUsers.add(clientUserId);
console.log(activeUsers)
await attentionConsumer.connect()
await attentionConsumer.subscribe({ topic: 'on_message', fromBeginning: false })
await attentionConsumer.run({
eachMessage: async ({ topic, partition, message }) => {
io.send(message.value.toString(), clientUserId)
},
})
console.log(`Received message => ${message}`)
})
});
我在上面所做的是,每次客户端连接并发送消息开始流时,我都会创建一个消费者连接,订阅一个主题,每次主题收到消息时,我都会将其发送回客户端。
这适用于一个客户机,一旦第二个客户机连接,我就会得到预期的错误,即consumer已经在运行。
如何在套接字范围外启动使用者(以便在应用程序加载时启动),以及在套接字内接收任何消息流时启动使用者(仅当存在可用的套接字连接时)?
我是新手,任何帮助都将不胜感激。
暂无答案!
目前还没有任何答案,快来回答吧!