javascript 限制Node js中的Q promise并发

yshpjwxd  于 2022-12-28  发布在  Java
关注(0)|答案(4)|浏览(132)

有没有办法限制在节点js中同时执行的并发Q承诺的数量?
我正在构建一个webscraper,它必须请求和解析3000多个页面,并且在不限制的情况下,我的一些请求没有得到及时响应,因此连接停止,所需的响应(html代码)变得不可用。
为了解决这个问题,我发现限制请求的数量我的问题就消失了。
我尝试过以下方法,但没有效果:

我需要请求一个url数组,一次只做一个请求,当数组中的所有url都完成后,返回数组中的结果。

function processWebsite() {
  //computed by this stage
  urls = [u1,u2,u3,u4,l5,u6,u7,u8,u9];

  var promises = throttle(urls,1,myfunction);

  // myfunction returns a Q promise and takes a considerable 
  // amount of time to resolve (approximately 2-5 minutes)
  
  Q.all(promises).then(function(results){
      //work with the results of the promises array
  });
}
jv4diomz

jv4diomz1#

我会这样做,它将迭代每个URL,构建一个承诺链,在前一个完成时运行,并使用请求结果数组进行解析。

return urls.reduce(function(acc, url){
    return acc.then(function(results)
        return myfunction(url).then(function(requestResult){
             return results.concat(requestResult)
        });
    });
}, Q.resolve([]));

你也可以把它变成一个帮手:

var results = map(urls, myfunction);

function map(items, fn){
    return items.reduce(function(acc, item){
        return acc.then(function(results)
            return fn(item).then(function(result){
                 return results.concat(result)
            });
        });
    }, Q.resolve([])
}

注意,bluebird promise库有一个帮助器来简化这类事情。

return Bluebird.map(urls, myfunction, {concurrency: 1});
rqdpfwrv

rqdpfwrv2#

下面是我尝试为Q创建一个节流的map函数。

function qMap(items, worker, concurrent) {
    var result = Q.defer();
    var work = [];
    var working = 0;
    var done = 0;

    concurrent = parseInt(concurrent, 10) || 1;

    function getNextIndex() {
        var i;
        for (i = 0; i < items.length; i++) {
            if (typeof work[i] === "undefined") return i;
        }
    }
    function doneWorking() {
        working--;
        done++;
        result.notify( +((100 * done / items.length).toFixed(1)) );
        if (!startWorking() && done === items.length) {
            result.resolve(work);
        }
    }
    function startWorking() {
        var index = getNextIndex();
        if (typeof index !== "undefined" && working < concurrent) {
            working++;
            work[index] = worker(items[index]).finally(doneWorking);
            return true;
        }
    }
    while (startWorking());
    return result.promise;
}

它接受

  • 要处理的items数组(在您的示例中为URL),
  • worker(必须是接受项目并返回承诺的函数)
  • 以及在任何给定时间要工作的最大值concurrent个项目。

它回来了

  • 一个承诺
  • 当所有工人都完成时,决定一系列已确定的承诺。

它没有失败,你必须检查个别承诺,以确定整体的运作状态。
在您的情况下,您可以这样使用它,例如15个并发请求:

// myfunction returns a Q promise and takes a considerable 
// amount of time to resolve (approximately 2-5 minutes)

qMap(urls, myfunction, 15)
.progress(function (percentDone) {
    console.log("progress: " + percentDone);
})
.done(function (urlPromises) {
    console.log("all done: " + urlPromises);
});
4ioopgfo

4ioopgfo3#

您可以在then()块中请求新的url

myFunction(urls[0]).then(function(result) {
  myFunction(urls[1]).then(function(result) {
    myFunction(urls[2]).then(function(result) {
      ...
    });
  });
});

当然,这是它的动态行为,我会维护一个队列,一旦一个承诺被解析,就从队列中取出一个url,然后再发出另一个请求,也许还有一个hash对象将url和结果联系起来。
第二次拍摄:

var urls = ...;
var limit = ...;
var dequeue = function() {
  return an array containing up to limit
};

var myFunction = function(dequeue) {
  var urls = dequeue();

  $q.all(process urls);
};

myFunction(dequeue).then(function(result) {
  myFunction(dequeue).then(function(result) {
    myFunction(dequeue).then(function(result) {
      ...
    });
  });
});
2o7dmzc5

2o7dmzc54#

没有外部库。只有普通的JS。

如果您真的只需要一次一个请求,那么使用async/await就很容易:

async function processWebsite(urls) {
  const responsesAndErrors = new Array(urls.length);
  for (let i = 0; i < urls.length; i++) {
    try {
      responsesAndErrors[i] = await processPage(urls[i]);
    } catch(error) {
      responsesAndErrors[i] = error;
    }
  }
  return responsesAndErrors;
}

async function processPage(url) {
  console.log(`${url} started`);
  const delay = Math.floor(Math.random() * 1500);
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      if (delay <= 1000) {
        console.log(`${url} finished successfully`);
        resolve(`${url} success`);
      } else {
        console.log(`${url} finished with error`);
        reject(`${url} error`);
      }
    }, delay);
  });
}

const urls = new Array(10).fill('url').map((url, index) => `${url}_${index + 1}`);

processWebsite(urls).then(responses => console.log('All', responses));

如果您需要一次超过1个请求,那么可以使用递归来解决。
其思想是,最初我们立即获取允许的最大页数,并且每个请求在完成时都应该递归地启动一个新请求。
在本例中,我将成功的响应与错误一起填充,并处理所有页面,但如果您希望在第一次失败时终止批处理,则可以稍微修改算法。

async function processWebsite(urls, limit) {
  limit = Math.min(urls.length, limit);

  return new Promise((resolve, reject) => {
    const responsesOrErrors = new Array(urls.length);
    let startedCount = 0;
    let finishedCount = 0;
    let hasErrors = false;

    function recursiveProcess() {
      let index = startedCount++;

      processPage(urls[index])
        .then(res => {
          responsesOrErrors[index] = res;
        })
        .catch(error => {
          responsesOrErrors[index] = error;
          hasErrors = true;
        })
        .finally(() => {
          finishedCount++;
          if (finishedCount === urls.length) {
            hasErrors ? reject(responsesOrErrors) : resolve(responsesOrErrors);
          } else if (startedCount < urls.length) {
            recursiveProcess();
          }
        });
    }

    for (let i = 0; i < limit; i++) {
      recursiveProcess();
    }
  });
}

async function processPage(url) {
  console.log(`${url} started`);
  const delay = Math.floor(Math.random() * 1500);
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      if (delay <= 1000) {
        console.log(`${url} finished successfully`);
        resolve(`${url} success`);
      } else {
        console.log(`${url} finished with error`);
        reject(`${url} error`);
      }
    }, delay);
  });
}

const urls = new Array(10).fill('url').map((url, index) => `${url}_${index + 1}`);

processWebsite(urls, 3)
  .then(responses => console.log('All successfull', responses))
  .catch(responsesWithErrors => console.log('All with several failed', responsesWithErrors));

相关问题