将kafka事件的消息发送到套接字

nfzehxib  于 2021-09-13  发布在  Java
关注(0)|答案(0)|浏览(277)

我正在尝试创建一个应用程序(在nodejs中)以将kafka事件流式传输到套接字连接。我使用的库是:kafkajs和socket.io
我可以通过以下方式对一个插座连接执行相同操作:

const activeUsers = new Set();
const kafka = new Kafka({
    clientId: 'BA-Webapp',
    brokers: ['XX.XX.XX.XX:9092']
})

const attentionConsumer = kafka.consumer({ groupId: 'XXX' })

// App setup
const PORT = 5000;
const app = express();
const server = app.listen(PORT, function () {
    console.log(`Listening on port ${PORT}`);
    console.log(`http://localhost:${PORT}`);
});

// Static files
app.use(express.static("public"));

// Socket setup
const io = socket(server);

io.on("connection", async function (socket) {

    console.log("Client connected")

    socket.on('start-stream', async clientUserId => {
        console.log("Client message received")
        socket.userId = clientUserId;
        activeUsers.add(clientUserId);
        console.log(activeUsers)
        await attentionConsumer.connect()
        await attentionConsumer.subscribe({ topic: 'on_message', fromBeginning: false })

        await attentionConsumer.run({
            eachMessage: async ({ topic, partition, message }) => {

                io.send(message.value.toString(), clientUserId)
            },
        })
        console.log(`Received message => ${message}`)
    })
});

我在上面所做的是,每次客户端连接并发送消息开始流时,我都会创建一个消费者连接,订阅一个主题,每次主题收到消息时,我都会将其发送回客户端。
这适用于一个客户机,一旦第二个客户机连接,我就会得到预期的错误,即consumer已经在运行。
如何在套接字范围外启动使用者(以便在应用程序加载时启动),以及在套接字内接收任何消息流时启动使用者(仅当存在可用的套接字连接时)?
我是新手,任何帮助都将不胜感激。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题