Promise 已经成为 JavaScript 里处理异步操作的标准方式。然而,当我们需要同时处理多个异步任务时,如何有效地控制 Promise 的并发,就成为了一个影响性能和用户体验的关键问题。

假设,你需要同时请求 100 个接口来获取数据。如果一股脑地发起所有请求,可能会导致以下问题:

  • 浏览器并发限制:浏览器对同一域名的并发请求数量有限制(通常是 6-8 个)。过多的请求会被阻塞,导致页面加载缓慢
  • 服务器压力过大:大量并发请求可能会给服务器带来巨大的压力,导致响应变慢甚至崩溃
  • 资源竞争:多个异步任务同时访问共享资源(例如数据库连接、文件等),可能会导致资源竞争和死锁
  • 用户体验差:页面长时间处于加载状态,用户体验极差

因此,我们需要对 Promise 的并发进行控制,在保证任务执行效率的同时,避免对系统资源造成过大的压力

Promise.all:并行执行,统一返回

Promise.all 接收一个 Promise 数组作为参数,并行执行所有 Promise,并在所有 Promise 都 fulfilled 后,返回一个包含所有结果的数组。

const promise1 = Promise.resolve(1);
const promise2 = Promise.resolve(2);
const promise3 = Promise.resolve(3);

Promise.all([promise1, promise2, promise3])
    .then(results => {
        console.log(results); // 输出:[1, 2, 3]
    });

适用场景: 多个异步任务之间没有依赖关系,可以并行执行。

注意: 如果其中任何一个 Promise 被 rejected,Promise.all 会立即 rejected,并且只返回第一个 rejected 的原因。

Promise.allSettled:并行执行,返回所有状态

Promise.allSettledPromise.all 类似,也是并行执行所有 Promise,但它会等待所有 Promise 都 settled(fulfilled 或 rejected),并返回一个包含所有 Promise 状态和结果(或原因)的数组。

const promise1 = Promise.resolve(1);
const promise2 = Promise.reject("Error");
const promise3 = Promise.resolve(3);

Promise.allSettled([promise1, promise2, promise3])
    .then(results => {
        console.log(results);
        /* 输出:
        [
            { status: 'fulfilled', value: 1 },
            { status: 'rejected', reason: 'Error' },
            { status: 'fulfilled', value: 3 }
        ]
        */
	});

适用场景: 需要获取所有 Promise 的执行结果,无论它们是 fulfilled 还是 rejected。

Promise.race:并行执行,谁快用谁

Promise.race 接收一个 Promise 数组作为参数,并行执行所有 Promise,只要其中任何一个 Promise settled(fulfilled 或 rejected),Promise.race 就会返回该 Promise 的结果(或原因)。

const promise1 = new Promise(resolve => setTimeout(resolve, 500, 1));
const promise2 = new Promise(resolve => setTimeout(resolve, 100, 2));

Promise.race([promise1, promise2])
	.then(result => {
        console.log(result); // 输出:2(因为 promise2 更快)
    });

适用场景: 只需要获取最快完成的 Promise 的结果,例如设置请求超时。

Promise.any (ES2021):并行执行,返回第一个 fulfilled

Promise.any 接收一个 Promise 数组作为参数,并行执行所有 Promise,只要其中任何一个 Promise fulfilled,Promise.any 就会返回该 Promise 的结果。如果所有 Promise 都 rejected,则返回一个 AggregateError

const promise1 = Promise.reject("Error 1");
const promise2 = new Promise(resolve => setTimeout(resolve, 100, 2));
const promise3 = Promise.reject("Error 3");

Promise.any([promise1, promise2, promise3])
	.then(result => {
        console.log(result); // 输出:2
    })
    .catch(error => {
        console.error(error); // 如果所有 Promise 都 rejected,则会捕获到 AggregateError
    });

适用场景: 需要获取第一个成功的 Promise 的结果。

自定义并发控制函数:限制最大并发数

Promise.all 等方法虽然可以并行执行 Promise,但无法控制并发数量。我们可以自己实现一个函数来限制最大并发数。

async function limitedConcurrency(tasks, limit) {
    const results = [];
    const running = [];
    let current = 0;
    
    while (current < tasks.length || running.length > 0) {
        if (running.length < limit && current < tasks.length) {
            const task = tasks[current](); // 执行任务函数,得到 Promise
            running.push(task);
            current++;

            task.then(result => {
                results.push(result);
                running.splice(running.indexOf(task), 1);
            });
        } else {
            await Promise.race(running); // 等待任意一个 Promise 完成
        }
    }
    
    return Promise.all(results);
}

使用示例:

const tasks = [];
for (let i = 0; i < 10; i++) {
    tasks.push(() => new Promise(resolve => {
        setTimeout(() => {
            console.log(`Task ${i} complete`);
            resolve(i);
        }, Math.random() * 1000);
    }));
}

limitedConCurrency(tasks, 3) // 最大并发数为 3
	.then(results => {
        console.log("All tasks completed:", results);
    });

原理:

  • tasks: 一个包含任务函数的数组,每个任务函数返回一个 Promise。
  • limit: 最大并发数。
  • results: 存储所有任务的结果。
  • running: 存储当前正在执行的任务(Promise)。
  • current: 指向下一个要执行的任务。
  • while 循环:只要还有任务未执行或有任务正在执行,就继续循环。
  • if 条件:如果当前正在执行的任务数量小于 limit 且还有任务未执行,则取出下一个任务执行,并将其添加到 running 数组中。
  • task.then():监听任务完成,将结果添加到 results 数组,并将任务从 running 数组中移除。
  • await Promise.race(running):如果当前正在执行的任务数量已达到 limit,则等待任意一个任务完成。
  • Promise.all(results): 等待所有任务执行, 并返回结果。

使用第三方库:p-limitasync-pool

有一些成熟的第三方库可以更方便地实现 Promise 并发控制,例如:

  • p-limit 一个轻量级的 Promise 并发控制库。
const pLimit = require('p-limit');

const limit = pLimit(3); // 最大并发数为 3

const tasks = []
for (let i = 0; i < 10; i++) {
    tasks.push(limit(() => new Promise(resolve => {
        setTimeout(() => {
            console.log(`Task ${i} completed`);
            resolve(i);
        }, Math.random() * 1000);
    })));
}

Promise.all(tasks)
	.then(results => {
        console.log("All tasks completed:", results);
    });
  • async-pool 一个支持多种并发策略的 Promise 并发控制库。

使用 Generator 函数和 yield 关键字

Generator 函数可以暂停和恢复执行,结合 yield 关键字,可以实现更细粒度的并发控制。

async function* taskGenerator(tasks) {
    for (const task of tasks) {
        yield task();
    }
}

async function runTasks(tasks, limit) {
    let pool = [];
    let results = [];
    for await (let result of taskGenerator(tasks)) {
        pool.push(result);
        results.push(result);
        if (pool.length >= limit) {
            await Promise.race(pool);
            pool = pool.filter(p => p.status != 'fulfilled' && p.status != 'rejected') // 手动维护
        }

    }
    return Promise.all(results)
}

使用消息队列

对于非常大量的异步任务,且允许一定的延迟,可以使用消息队列(例如 RabbitMQ,Kafka 等),将任务放入队列,然后由多个消费者并行处理。