Node.js中Redis的并发问题

pbwdgjma  于 12个月前  发布在  Redis
关注(0)|答案(1)|浏览(114)

问题

我在Redis中有一个名为stocks的键,它的值是1000。假设300个客户同时请求购买(每个客户购买100只股票)。最后,只有10个客户可以购买。

解决方案

我知道它不会工作,但假设我的buy函数是这样的:

/**
 * @param {import("redis").RedisClientType} instance
 * @param {string} clientId
 * @param {number} n
 */
async function buy(instance, clientId, n) {
  // --- (1) ----
  // Get current number of stocks
  let stocks = await instance.GET("stocks");
  stocks = parseInt(stocks);

  // --- (2) ----
  // Validate if the stocks remaining are enough to be bought
  if (stocks < n) {
    console.log("error: company does not have enough stocks");
    return new Error("error: company does not have enough stocks");
  }

  // --- (3) ----
  // Update the current stocks of the company and log who has bought stocks
  await instance.INCRBY("stocks", -n);
  console.log("client @%s has bought %s stocks successfully!", clientId, n);
}

字符串
为了测试它,我写了一个函数,它调用buy函数300次:

const redis = require("redis");
const crypto = require("crypto");
const { buy } = require("./buy");

async function main(customers = 300) {
  const instance = await redis
    .createClient({ url: "redis://localhost:6379" })
    .connect();

  // --- (1) ----
  // set stocks
  await instance.SET("stocks", 1000);

  // --- (2) ----
  // buy 100 stocks concurrentlly for each customer
  let pool = [];
  for (let i = 0; i < customers; i++) {
    let userId = crypto.randomBytes(4).toString("hex");
    pool.push(buy_v3(instance, userId, 100));
  }
  await Promise.all(pool);

  // --- (3) ----
  // Get the remaining stocks
  let shares = await instance.GET("stocks");
  console.log("the number of free shares the company has is: %s", shares);

  await instance.disconnect();
}
main();


输出量:

...
error: company does not have enough stocks
error: company does not have enough stocks
error: company does not have enough stocks
error: company does not have enough stocks
the number of free stocks the company has is: -29000


正如我所说,它没有工作,但要修复,我使用这种方法:

/**
 * @param {import("redis").RedisClientType} instance
 * @param {string} clientId
 * @param {number} n
 */
async function buy(instance, clientId, n) {
  try {
    await instance.executeIsolated(async (client) => {
      // --- (1) ----
      // Get current number of stocks
      let stocks = await client.GET("stocks");
      stocks = parseInt(stocks);

      // --- (2) ----
      // Validate if the stocks remaining are enough to be bought
      if (stocks < n) {
        throw new Error("error: company does not have enough stocks");
      }

      // --- (3) ----
      // Update the current stocks of the company
      await client.INCRBY("stocks", -n);
    });

    console.log("client @%s has bought %s stocks successfully!", clientId, n);
  } catch (err) {
    console.log(err.message);
  }
}


如果你再测试一次,你会看到这样的结果:

...
error: company does not have enough stocks
error: company does not have enough stocks
error: company does not have enough stocks
error: company does not have enough stocks
the number of free stocks the company has is: 0


这意味着它工作没有问题。
问题
上面的解决方案工作得很好,但我对executeIsolated函数有点困惑。据我所知,它只是创建一个新的连接(你可以在这里看到),当你想在独占连接上运行命令时,它很有用,比如watch命令。
谁能解释一下executeIsolated在我的例子中的确切作用?

pgccezyw

pgccezyw1#

问题是,当有并发请求时,不能保证第n个请求的SET将在第n +1个请求的GET之前运行。例如,如果有2个并发请求,命令应该按以下顺序执行:

> GET stocks
"100"
> INCRBY stocks -100
(integer) 0
> GET stocks
"0"

字符串
但它们可能会按以下顺序执行:

> GET stocks
"100"
> GET stocks
"100"
> INCRBY stocks -100
(integer) 0
> INCRBY stocks -100
(integer) -100


为了修复它,你应该使用一个Redis function(从redis 7.0开始可用)或Lua Script,看起来像这样:

local stocks = redis.call('GET', KEYS[1])
if stocks < ARGS[1] then
  return redis.error_reply('company does not have enough stocks')
end

redis.call('SET', KEYS[1], stocks - ARGS[1])
return redis.status_reply('OK')


关于为什么使用executeIsolated“修复”这个问题-可能有两个原因:
1.池大小为1,这实际上创建了一个队列
1.池中没有“空闲”连接,创建新连接所需的时间比执行GET所需的时间长。

相关问题