在Javascript中同时执行承诺和缓冲池大小

e37o9pze  于 2022-12-28  发布在  Java
关注(0)|答案(4)|浏览(119)

我有一个带有promises的函数,它必须执行n次,每次都使用不同的参数。我希望将promises链接起来,使脚本每次总是处理3-4个promises。
我用promise,all,这个同时执行3,当所有的promise都解决了,它继续下一个3。
如何使它的工作,当其中一个3解决它立即开始与另一个,但总是工作在最大3的时间?

for( var i = 0; i < tasks.length; i++){

    if( i > 0 && i%3 == 0 ){

      await Promise.all([
       doTaskFunction(tasks[i]),
        doTaskFunction(tasks[i-1]),
        doTaskFunction(tasks[i-2]),
      ]);
    }

  }
lp0sw83n

lp0sw83n1#

使用es6-promise-pool可以相当容易地实现这一点:

const tasks = [
    (param) => new Promise(function(resolve, reject) {
        setTimeout(resolve, 2000, 'foo');
    }),
    () => new Promise(function(resolve, reject) {
        setTimeout(resolve, 2000, 'foo');
    }),
    () => new Promise(function(resolve, reject) {
        setTimeout(resolve, 2000, 'foo');
    }),
    () => Promise.resolve(1),
    () => Promise.resolve(2),
    () => Promise.resolve(3)
 ];
 
 let count = 1;

 const promiseProducer = () => {
    while(tasks.length) {
       console.log('processing ' + count++);
       const task = tasks.shift();
       return task(); // optionally you could pass a parameter here
    }
    
    return null;
 }
 
 const pool = new PromisePool(promiseProducer, 3); // concurrent Promises set to 3
 const poolPromise = pool.start();

 poolPromise.then(() => { console.log('done!'); })
<script src="https://cdn.jsdelivr.net/npm/es6-promise-pool@2.5.0/es6-promise-pool.min.js"></script>
a9wyjsp7

a9wyjsp72#

我只是把我对生成器实现的天真留在这里!:)

function* myIteratorFactory(arr) {
  for (let i = 0; i < arr.length; i++) {
    yield(arr[i])
  }
}


function delayPromise(text, ms) {
  return function() {
    return new Promise((resolve, reject) => {
      console.log('[%s] Promise with value %s just started', new Date().toISOString(), text)
      setTimeout(() => resolve(text), ms)
    }).then(() => console.log('[%s] Promise with value %s just ended', new Date().toISOString(), text))
  }
}

var promArr = [
  delayPromise('hi', 1500),
  delayPromise('alex', 2000),
  delayPromise('how', 1700),
  delayPromise('are', 1800),
  delayPromise('you', 1500),
]

var que = 0
var myIterator = myIteratorFactory(promArr)


function executor(r) {

  while (que < 3) {
    var next = myIterator.next()
    if (next.done) return;

    next.value()
      .then(() => {
        que--
        executor(r)
        if (que == 0) r()
      })
    que++
  }



}
executor(() => console.log('i am done for today!'))
fnatzsnv

fnatzsnv3#

如果您不想使用任何插件/依赖项,您可以使用此解决方案。
假设您的数据位于名为datas的数组中
1.创建一个函数来处理datas数组中的数据,我们将其命名为processData()
1.创建一个函数,该函数将在while循环中一个接一个地执行processData(),直到datas数组中没有数据为止,我们调用该函数bufferedExecution()
1.创建大小为buffer_size的数组
1.用bufferedExecution()填充数组
1.并等待它在Promise.all()Promise.allSettled()中解析
这里是一个工作示例,其中数据是数字和操作等待一段时间,并返回数字的平方,它也随机拒绝.

const datas = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13];
// this datas array should not contain undefined values for this code to work

const buffer_size = 3;
const finishedPromises = [];

// change this function to your actual function that processes data
async function processData(item) {
  return new Promise((resolve, reject) => {
    // wait for some time
    setTimeout(() => {
      // randomly resolve or reject
      if (Math.random() > 0.5) {
        resolve(item ** 2);
      } else {
        reject("error message");
      }
    }, 1500);
  });
}

// this function executes one function per loop, but magic happens when you
// execute the function below, multiple times
async function bufferedExecution(callback, i) {
  return new Promise(async (resolve, reject) => {
    // take first vale to process
    let next = datas.shift();
    // check if there is a value, (undefined means you have reached the end of datas array)
    while (next != undefined) {
      // just to show which function is running (index of function in array)
      console.log(`running function id: ${i}`);
      let result;
      try {
        // process data with your function's callback
        result = await callback(next);
        // result finishes without error
        finishedPromises.push({
          input: next,
          result: result,
        });
      } catch (error) {
        // rejected, so adds error instead of result
        finishedPromises.push({
          input: next,
          error: error,
        });
      }
      // get next data from array and goes to next iteration
      next = datas.shift();
    }
    // once all that is done finish it
    resolve();
  });
}

// here is where the magic happens
// we run the bufferedExecution function n times where n is buffer size
// bufferedExecution runs concurrently because of Promise.all()/Promise.allsettled()
const buffer = new Array(buffer_size)
  .fill(null)
  .map((_, i) => bufferedExecution(processData, i + 1));

Promise.allSettled(buffer)
  .then(() => {
    console.log("all done");
    console.log(finishedPromises);
    // you will have your results in finishedPromises array at this point
    // you can use input KEY to get the actual processed value
    // first check for error, if not get the results
  })
  .catch((err) => {
    console.log(err);
  });

产出

// waits a while
running function id: 1
running function id: 2
running function id: 3
// waits a while
running function id: 1
running function id: 2
running function id: 3
// waits a while
running function id: 1
running function id: 2
running function id: 3
// waits a while
running function id: 1
running function id: 2
running function id: 3
// waits a while
running function id: 1
all done
[
  { input: 1, error: 'error message' },
  { input: 2, result: 4 },
  { input: 3, result: 9 },
  { input: 4, result: 16 },
  { input: 5, error: 'error message' },
  { input: 6, result: 36 },
  { input: 7, result: 49 },
  { input: 8, error: 'error message' },
  { input: 9, result: 81 },
  { input: 10, result: 100 },
  { input: 11, result: 121 },
  { input: 12, error: 'error message' },
  { input: 13, result: 169 }
]
s3fp2yjn

s3fp2yjn4#

    • 没有外部库。只有普通的JS。**

它可以用递归来解决。
其思想是,最初我们立即运行最大允许数量的任务,并且这些任务中的每一个都应该在其完成时递归地启动一个新任务。
在本例中,我将成功的响应与错误一起填充,并执行所有请求,但如果您希望在第一次失败时终止批处理执行,则可以稍微修改算法。

async function batchExecute(tasks, limit) {
  limit = Math.min(tasks.length, limit);

  return new Promise((resolve, reject) => {
    const responsesOrErrors = new Array(tasks.length);
    let startedCount = 0;
    let finishedCount = 0;
    let hasErrors = false;

    function recursiveExecute() {
      let index = startedCount++;

      doTaskFunction(tasks[index])
        .then(res => {
          responsesOrErrors[index] = res;
        })
        .catch(error => {
          responsesOrErrors[index] = error;
          hasErrors = true;
        })
        .finally(() => {
          finishedCount++;
          if (finishedCount === tasks.length) {
            hasErrors ? reject(responsesOrErrors) : resolve(responsesOrErrors);
          } else if (startedCount < tasks.length) {
            recursiveExecute();
          }
        });
    }

    for (let i = 0; i < limit; i++) {
      recursiveExecute();
    }
  });
}

async function doTaskFunction(task) {
  console.log(`${task} started`);
  const delay = Math.floor(Math.random() * 1500);
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      if (delay <= 1000) {
        console.log(`${task} finished successfully`);
        resolve(`${task} success`);
      } else {
        console.log(`${task} finished with error`);
        reject(`${task} error`);
      }
    }, delay);
  });
}

const tasks = new Array(10).fill('task').map((task, index) => `${task}_${index + 1}`);

batchExecute(tasks, 3)
  .then(responses => console.log('All successfull', responses))
  .catch(responsesWithErrors => console.log('All with several failed', responsesWithErrors));

相关问题