我正在将redis发布/子系统转换为redis流,以便可以为服务器发送的事件添加一些容错性。
传统的订阅方式很简单:
import { createClient } from 'redis';
const redisOptions = {
url: `${process.env.REDIS_URL}/0`
}
const redis = createClient(redisOptions);
redis.setMaxListeners(100000);
redis.on("message", (channel, message) => {
console.log(channel);
console.log(message);
});
redis.subscribe('foo');
这会永久阻塞,并保持连接打开。在这种情况下,发布到redis将添加到您的日志中。
const json = { a: 1, b: 2 };
redis.publish('foo', JSON.stringify(json));
切换到流,使用 XREAD
而不是订阅 XADD
而不是发布,数据就大不相同了。我正在挣扎的部分是阻塞。
redis.xread('BLOCK', 0, 'STREAMS', 'foo', '$', (err, str) => {
if (err) return console.error('Error reading from stream:', err);
str.forEach(message => {
console.log(message);
});
}
发送消息时,我的“订阅”会接收第一条消息,但不会记录更多消息。
1条答案
按热度按时间6xfqseft1#
说实话,我之所以问这个问题,是因为谷歌对我没有好处,而且我找不到其他人发布关于这个问题的帖子。希望这有帮助!
所以,
XREAD
只在第一次呼叫时阻塞。它将在一个设定的时间段内等待(如果您将时间设置为0,则无限期等待),但一旦它接收到数据,它的职责就被认为已完成,并且它将取消阻止。为了保持“订阅”的活力,你需要打电话XREAD
同样,使用流中最新的id。这将替换初始值$
我们传递了它。递归似乎是一个完美的解决方案: