Node.js 中的 co 模块

Node.js 中的 co 模块

Node.js 中的 co 模块主要用于 Generator 函数的自动执行,可以使我们以同步的形式编写异步代码

实例一

先来看两个对比实例,传统方式下,sayhello 是一个异步函数,执行 helloworld 会先输出 'world' 再输出 'hello'

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
setTimeout(() => {
console.log('!')
}, 0)

function sayhello() {
return Promise.resolve('hello').then(function (hello) {
console.log(hello)
})
}

function helloworld() {
sayhello()
console.log('world')
}

helloworld()

// 'world'
// 'hello'
// !

这是因为 Promise 是基于任务队列机制的(详细可以参考 JavaScript 并发模型),即当前代码执行完的时候才会触发,但是会在下一个 EventLoop 之前执行(注意与 setTimeout 区分开来)

实例二

我们将上面的示例换一种写法,调整成 Promise + Generator 的方式来试试,也就是模拟一下 co 当中的实现方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
function co(gen) {
var it = gen()
var ret = it.next()
ret.value.then(function (res) {
it.next(res)
})
}

function sayhello() {
return Promise.resolve('hello').then(function (hello) {
console.log(hello)
})
}

co(function* helloworld() {
yield sayhello()
console.log('world')
})

// 'hello'
// 'world'

我们模拟实现了 co 函数,首先生成一个迭代器,然后执行一遍 next(),得到的 value 是一个 Promise 对象,promise.then() 里面再执行 next(),运行后可以发现,结果就是我们想要的先输出 'hello' 再输出 'world'

从上面示例可以看出,Generator 函数体可以挂载在 yield 语句处,直到下一次执行 next(),我们本章当中将要介绍的 co 模块的思路也就是利用了 Generator 的这个特性,将异步操作跟在 yield 后面,当异步操作完成并返回结果后,再触发下一次 next() ,当然,跟在 yield 后面的异步操作需要遵循一定的规范 thunkspromises

从上面示例我们也可以简单的推算出 co 的主要功能有下面这些

  • 异步流程控制,依次执行 Generator 函数内的每个位于 yield 后的 Promise 对象,并在 Promise 的状态改变后,把其将要传递给 reslove 函数的结果或传递给 reject 函数的错误返回出来,可供外部来进行传递值等操作,这些 Promise 是串行执行的
  • yield 后是 Promise 对象的数组或属性值是 Promise 对象的对象,则返回出结构相同的 Promise 执行结果数组(对象),并且这些 Promise 是并行执行的
  • co 自身的返回值也是一个 Promise 对象,可供继续使用

run

由上面的示例我们可以发现,Generator 函数的自动执行需要一种机制,即当异步操作有了结果,能够自动交回执行权,有两种方法可以做到这一点

  • 回调函数,将异步操作进行包装,暴露出回调函数,在回调函数里面交回执行权
  • Promise 对象,将异步操作包装成 Promise 对象,用 then 方法交回执行权

在看 co 源码之前,我们先来尝试着自己实现一下,也就是稍微完善一下上面的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 第一版
function run(gen) {
var gen = gen()
function next(data) {
var result = gen.next(data)
if (result.done) return
// 判断 result.value 是否是 Promise,如果是就添加 then 函数,不是就直接执行
if (isPromise(result.value)) {
result.value.then(data => {
next(data)
})
} else {
result.value(next)
}
}
next()
}

function isPromise(obj) {
return typeof obj.then == 'function'
}

上面我们已经完成了一个基本版的启动器函数,支持 yield 后跟回调函数或者 Promise 对象,但是并不完善,比如我们没有针对 Generator 进行错误捕获,所以我们可以考虑将其封装成一个 Promise 的形式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// 第二版
function run(gen) {
var gen = gen()
return new Promise((resolve, reject) => {
function next(data) {
try {
var result = gen.next(data)
} catch (e) {
return reject(e)
}
if (result.done) {
return resolve(result.value)
}
var value = toPromise(result.value)
value.then(data => {
next(data)
}, e => {
reject(e)
})
}
next()
})
}

function isPromise(obj) {
return typeof obj.then == 'function'
}

function toPromise(obj) {
if (isPromise(obj)) return obj
if (typeof obj == 'function') return thunkToPromise(obj)
return obj
}

function thunkToPromise(fn) {
return new Promise(function (resolve, reject) {
fn(function (err, res) {
if (err) return reject(err)
resolve(res)
})
})
}

在这一版当中,我们返回了一个 Promise

  • result.donetrue 的时候,我们将该值 resolve(result.value)
  • 如果执行的过程中出现错误,被 catch 住,我们会将原因 reject(e)
  • 其次,我们会使用 thunkToPromise 将回调函数包装成一个 Promise,然后统一的添加 then 函数

最后,我们再来看看 co 源码当中具体是如何实现的

co

源码实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
// co 的核心代码如下
function co(gen) {

// 保持当前函数的上下文
var ctx = this;

// 截取 co 输入的参数,剔除 arguments 中的第一个参数,即 gen 对象,剩余参数作为 gen 的入参
var args = slice.call(arguments, 1)

// co 函数整个的返回值是一个 promise 实例,包装了传递的 generator 函数内所有 promise 的执行
// 这也是它为什么可以使用 then 和 catch 的根源
return new Promise(function (resolve, reject) {

// 判断传入的 gen 是否为函数,如果是则执行,将结果赋值给 gen 对象
if (typeof gen === 'function') gen = gen.apply(ctx, args);

// 根据 generator 函数执行结果是否存在 next 字段,判断 gen 是否为 generator 迭代器对象
// 若不是,则调用 resolve 返回最外层的 promise 对象状态
if (!gen || typeof gen.next !== 'function') return resolve(gen);

// 如果是 generator 迭代器对象,开始控制 gen.next() 方法的调用
// 调用入口函数,成功方法
onFulfilled();

// 入口函数,将传递进来的 generator 函数执行到第一个 yield 处来开启第一次的异步调用
// 另外也用作内部使用,作为 resolve 方法,处理异步结果,并继续调用下一个 promise
function onFulfilled(res) {
var ret;
try {
// res 为此次调用的 Peomise 结果
// 利用 generator 函数的特性,调用 next() 方法的参数,会作为 yield 的返回值
// 并将异步操作的结果返回给 ret.value
ret = gen.next(res);
} catch (e) {
// 如果发生错误,则把 Peomise 状态指为 rejected,并且把错误结果返回出去
return reject(e);
}

// 将 generator 函数执行 next() 后的结果再次传入 next() 方法,实现串行调用
next(ret);
}

// 上面的 onFulfilled 函数作为内部的成功方法,下面这个则作为失败方法
// 实现和上面类似
function onRejected(err) {
var ret;
try {
ret = gen.throw(err);
} catch (e) {
return reject(e);
}
next(ret);
}

// 首先需要明确,generator 函数在执行完毕后会返回 { done: true, value: undefined }
function next(ret) {

// 如果执行完成,直接调用 resolve 把 promise 置为成功状态
if (ret.done) return resolve(ret.value);

// 然后把 yield 的值(ret.value)转换成 promise(ctx 为上面保存的 this)
// 支持 promise,generator,generatorFunction,array,object
var value = toPromise.call(ctx, ret.value);

// 成功转换就可以直接给新的 promise 添加 onFulfilled, onRejected
// 当新的 promise 状态变成结束态(成功或失败),就会调用对应的回调,整个 next 链路就执行下去了

// 之所以可以一直 next() 下去,这是因为上面的 ret.value 是一个 Peomise 对象
// 当 return value.then(onFulfilled, onRejected) 以后,意味着又要去执行上面的 onFulfilled 了
// 然后会在 onFulfilled 里面再次去调用 next(ret)
// 这样就会一直循环下去,直到完成整个链的操作

if (value && isPromise(value)) return value.then(onFulfilled, onRejected);

// 如果以上情况都没发生,则参数为非 thunk 函数和 promise 对象
// 那么就将 promise 对象的状态改为 rejected,从而终止执行
return onRejected(new TypeError('You may only yield a function, promise, generator, array, or object, '
+ 'but the following object was passed: "' + String(ret.value) + '"'));
}
});
}

核心代码入口是 onFulfilled,无论如何第一次的 next(ret) 是一定要执行的,因为 generator 必须要 next() 一下的,但是 co 实际上有两种调用方式,分为有参数和无参数的,很明显以上是无参数的 generator 执行器,那么有参数的 wrap 呢?co 为我们提供了简单的包装

1
2
3
4
5
6
7
8
9
// 为有参数的 generator 调用,提供简单包装
co.wrap = function (fn) {
createPromise.__generatorFunction__ = fn;
return createPromise;
function createPromise() {
// 把 arguments 给 fn 当参数
return co.call(this, fn.apply(this, arguments));
}
};

通过 callapply 的组合使用来传递 arguments

辅助函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
// 将传入的参数对象转换为 Promise 对象
function toPromise(obj) {

if (!obj) return obj;

// 如果 obj 已经是 Promise 对象,则直接返回
if (isPromise(obj)) return obj;

// 如果 generator 是函数或对象,则直接把 obj 作为参数传入 co 函数,并返回
if (isGeneratorFunction(obj) || isGenerator(obj)) return co.call(this, obj);

// 如果 obj 是函数,则直接视为符合 thunk 规范的函数直接转换为 Promise
// 关于 thunk 函数可以参考 阮一峰老师的 Thunk 函数的含义和用法
if ('function' == typeof obj) return thunkToPromise.call(this, obj);

// 如果是数组,把数组中每个元素转化为内部 Promise,然后使用 Promise.all 并行计算
if (Array.isArray(obj)) return arrayToPromise.call(this, obj);

// 如果是对象,则遍历对象中的每个 key 对应的 value,然后使用 Promise.all 并行计算
if (isObject(obj)) return objectToPromise.call(this, obj);

// 最后返回
return obj;

}

// 这里的 obj 为数组
// 所以使用 Array.map 方法,分别对数组中的每一个元素递归执行 toPromise 方法
// 再利用 Promise.all 方法,在所有给定的可迭代完成时执行 resolve(或者第一个代码失败)并返回结果
function arrayToPromise(obj) {
return Promise.all(obj.map(toPromise, this));
}

// thunk 转为 promise
function thunkToPromise(fn) {
var ctx = this;
return new Promise(function (resolve, reject) {
fn.call(ctx, function (err, res) {
if (err) return reject(err);
if (arguments.length > 2) res = slice.call(arguments, 1);
resolve(res);
});
});
}

// 这里的 obj 为对象
function objectToPromise(obj) {

// results 是将用于返回的对象,使用和 obj 相同的构造函数
var results = new obj.constructor();

// Object.keys 方法用于返回对象的所有的属性名
var keys = Object.keys(obj);

// 用于保存所有对象属性的 Promise 的数组
var promises = [];

// 利用 for 循环来实现并行的异步调用
for (var i = 0; i < keys.length; i++) {

var key = keys[i];

// 转换为 Promise 对象
var promise = toPromise.call(this, obj[key]);

// 如果是 promise 对象,直接调用 defer 函数,添加到 promises 数组中,否则直接将结果返回给 result[key]
if (promise && isPromise(promise)) defer(promise, key);
else results[key] = obj[key];
}

// 将 Promise 数组传入 Promise.all,待 defer 函数中 results 对象的属性都赋值完毕后,返回最终的 results 对象
// 使得后续调用的 then() 可以获得此处的 results
return Promise.all(promises).then(function () {
return results;
});

// key 对应的元素成功转化为 Promise 对象后,调用 Promise 的 resovle 方法
// 以便在 results 中获取每个 Promise 对象成功执行后的结果
function defer(promise, key) {
results[key] = undefined;
promises.push(promise.then(function (res) {
results[key] = res;
}));
}
}

经过上面这些步骤,我们可以得到 yield 后面只能是函数、Promise 对象、Generator 函数、Generator 迭代器对象、数组(元素仅限之前的 4 类)和 Object(对应 value 仅限定之前的 4 类),现在可以把 co 串行调用 generator 函数中 yield 的过程总结如下

  • 首先进入最外层的 Promise
  • 通过入口 onFilfilled() 方法,将 generator 函数运行至第一个 yield 处,执行该 yield 后边的异步操作,并将结果传入 next 方法
    • 如果 next 中传入结果的 donetrue(已经完成),则返回最外层 Promiseresolve
    • 如果 next 中传入结果的 donefasle(表示还没执行完),则返回 value(即 yield 后边的对象)然后查看是否可以转化为内部 Promise 对象。如无法转化则抛出错误,返回最外层 Promisereject
  • 若能转化为 Promise 对象,则通过 then(onFilfilled, onRejected) 开始执行
  • onFilfilled() 或者 onRejected() 内部调用再次调用 next() 方法,实现串行执行 yield,并将 yield 后边的对象传递给 next(),依次重复(实现链式调用)
  • 所有 yield 执行返回,将最后的 return 值返回给最外层 Promiseresovle 方法,结束 cogenerator 函数的调用

参考

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×