js 并发控制怎么依次得到每个请求的结果?

写了一段 js 控制并发数的逻辑,已经实现了控制并发,但是我想在最下面的 for 循环中得到每个 task 执行后的 response。 目前只能得到最开始 5 个的,这是为什么?

const createPool = (task, { concurrency } = {}) => {
  const pool = [];
  let runningCount = 0;
  return function (i) {
    return new Promise((resolve, reject) => {
      pool.push(() => task(i));
      function run() {
        while (pool.length && runningCount < concurrency) {
          const runTask = pool.shift();
          runningCount++;
          runTask()
            .then((val) => {
              resolve(val)
            })
            .catch((e) => reject(e))
            .finally(() => {
              runningCount--;
              run();
            });
        }
      }
      run();
    });
  };
};

// 简化场景,模拟业务调用
const originFetchData = (id) => {
  return new Promise((resolve) => {
    setTimeout(() => resolve(id), 2000);
  });
};
const fetchData = createPool(originFetchData, { concurrency: 5 });

for (let i = 0; i < 100; i++) {
  fetchData(i).then((response) => {
    console.log("response ---", response);  // TODO problem : 想在这里得到每个 task 的 response
  });
}

目前的一个结果:

我知道可以用一个数组 or 对象存放批处理的结果,但是现在的场景需要单独处理每个 response

Help,Thanks!

阅读 786
3 个回答

100次 for 循环是同步执行的,也就是说一开始就会产生 100 个 并发任务,但由于并发数的控制,只能进去 5 个,后 95 个的 Promise 都被丢弃了,当后面 95 次 异步任务 触发的时候实际上都是用的前面 5 次 闭包 中的 reslovereject

因此这里需要用一个 map 缓存一下 Promiseresolvereject

const createPool = (task, { concurrency } = {}) => {
  let runningCount = 0;
  const pool = [];
  const promiseMap = new Map();

  return function (i) {
    return new Promise((resolve, reject) => {
      promiseMap.set(i, { resolve, reject });

      pool.push(() => task(i));

      function run() {
        while (pool.length && runningCount < concurrency) {
          const runTask = pool.shift();
          runningCount++;
          runTask()
            .then((val) => {
              const { resolve } = promiseMap.get(val)
              resolve(val)
            })
            .catch((e) => reject(e))
            .finally(() => {
              runningCount--;
              run();
            });
        }
      }
      run();
    });
  };
};

你在 createPool 里面不是限制了执行次数么

while (pool.length && runningCount < concurrency)
...

然后你初始化 fetchData 的时候 限制了 执行 5 次

const fetchData = createPool(originFetchData, { concurrency: 5 })

根据 @木木剑光 的解答进行略微的修改。

const createPool = (task, { concurrency } = {}) => {
  const pool = [];
  let runningCount = 0;
  const promiseMap = new Map();

  return function (i) {
    return new Promise((resolve, reject) => {
      pool.push({ id: i, fn: () => task(i) });//modify1: 队列中以{id,fn}的方式存储任务
      promiseMap.set(i, { resolve, reject });
      function run() {
        while (pool.length && runningCount < concurrency) {
          const runTask = pool.shift();
          runningCount++;
          runTask
            .fn()
            .then((val) => {
              const { resolve } = promiseMap.get(runTask.id); // modify2: 使用当前任务的 id 去获取其 promise 的 resolve
              resolve(val);
            })
            .catch((e) => {
              const { reject } = promiseMap.get(runTask.id);// modify3: 使用当前任务的 id 去获取其 promise 的 reject
              reject(e);
            })
            .finally(() => {
              runningCount--;
              run();
            });
        }
      }
      run();
    });
  };
};

// 简化场景,模拟业务调用
const originFetchData = (id) => {
  return new Promise((resolve,reject) => {
    setTimeout(() => (id % 2 === 0 ? resolve(id) : reject(id)), 2000);
  });
};
const fetchData = createPool(originFetchData, { concurrency: 5 });

for (let i = 0; i < 20; i++) {
  fetchData(i).then((response) => {
    console.log("response ---", response); // TODO problem : 想在这里得到每个 task 的 response
  }).catch(e=>console.error('err--',e));
}

结果正常了。

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题
宣传栏