基本概念
协程:多个线程互相协作,完成异步任务
协程有点像函数,又有点像线程。它的运行流程大致如下:
第一步,协程A开始执行。
第二步,协程A执行到一半,进入暂停,执行权转移到协程B。
第三步,(一段时间后)协程B交还执行权。
第四步,协程A恢复执行。
上面流程的协程A
,就是异步任务,因为它分成两段(或多段)执行
function* asyncJob() {
// ...其他代码
var f = yield readFile(fileA);
// ...其他代码
}
复制代码
协程遇到yield
命令就暂停,等到执行权返回,再从暂停的地方继续往后执行。它的最大优点,就是代码的写法非常像同步操作,如果去除yield
命令,简直一模一样。
异步任务的封装
虽然 Generator
函数将异步操作表示得很简洁
,但是流程管理却不方便
(即何时执行第一阶段、何时执行第二阶段)。
var fetch = require('node-fetch');
function* gen(){
var url = 'https://api.github.com/users/github';
var result = yield fetch(url);
console.log(result.bio);
}
var g = gen();
var result = g.next();
result.value.then(function(data){
return data.json();
}).then(function(data){
g.next(data);
});
复制代码
首先执行 Generator
函数,获取遍历器对象,然后使用next
方法(第二行),执行异步任务的第一阶段。由于Fetch
模块返回的是一个 Promise
对象,因此要用then
方法调用下一个next
方法。
Thunk 函数是自动执行 Generator 函数的一种方法
“传值调用”(call by value)
即在进入函数体之前,就计算x + 5的值(等于 6),再将这个值传入函数f。C 语言就采用这种策略。
var x = 1;
function f(m) {
return m * 2;
}
f(x + 5)
// 传值调用时,等同于
f(6)
复制代码
“传名调用”(call by name)
即直接将表达式x + 5传入函数体,只在用到它的时候求值。Haskell 语言采用这种策略。
var x = 1;
function f(m) {
return m * 2;
}
f(x + 5)
// 传名调用时,等同于
(x + 5) * 2
复制代码
Thunk 函数的含义 ,“传名调用”的一种实现策略
编译器的“传名调用”实现,往往是将参数放到一个临时函数之中
,再将这个临时函数传入函数体。这个临时函数就叫做 Thunk
函数。
Thunk
函数是“传名调用”的一种实现策略,用来替换某个表达式。
function f(m) {
return m * 2;
}
f(x + 5);
// 等同于
var thunk = function () {
return x + 5;
};
function f(thunk) {
return thunk() * 2;
}
复制代码
JavaScript 语言的 Thunk 函数
1、JavaScript
语言是传值调用,它的 Thunk
函数含义有所不同
2、在JavaScript
语言中,Thunk
函数替换的不是表达式,而是多参数函数
,将其替换成一个只接受回调函数作为参数
的单参数函数
3、js的Thunk:相当于把需要传递多个参数的函数(其中一个是cb),分为三次执行
第一次执行Thunk接收要执行的函数(fn)返回一个函数
第二次执行传递除cb以外的参数(…args)
第三次执行只传递cb
// 正常版本的readFile(多参数版本)
fs.readFile(fileName, callback);
// Thunk版本的readFile(单参数版本)
var Thunk = function (fileName) {
return function (callback) {
return fs.readFile(fileName, callback);
};
};
//调用Thunk 返回只接受cb的函数
var readFileThunk = Thunk(fileName);
//再次调用 只需要传入cb
readFileThunk(callback);
复制代码
Trunk函数封装
任何函数,只要参数有回调函数,就能写成 Thunk 函数的形式
// ES5版本
var Thunk = function(fn){
return function (){
var args = Array.prototype.slice.call(arguments);
return function (callback){
args.push(callback);
return fn.apply(this, args);
}
};
};
// ES6版本
const Thunk = function(fn) {
return function (...args) {
return function (callback) {
//因为aegs是一个数组 所以call的时候需要解构赋值
return fn.call(this, ...args, callback);
//或者
//args.push(callback)
//return fn.apply(this, args);
}
};
};
var readFileThunk = Thunk(fs.readFile);
readFileThunk(fileA)(callback);
复制代码
示例
const Thunk = function(fn) {
return function (...args) {
return function (callback) {
//因为aegs是一个数组 所以call的时候需要解构赋值
return fn.call(this, ...args, callback);
//或者
//args.push(callback)
//return fn.apply(this, args);
}
};
};
function f(a, cb) {
cb(a);
}
const ft = Thunk(f); //形成不销毁的作用域
// ft就是第一个return 的函数
// function (){
// var args = Array.prototype.slice.call(arguments);
// console.log(args);
// return function (callback){
// args.push(callback);
// return fn.apply(this, args);
// }
// };
ft(1)(console.log)
// 1
//ft(1)得到第二个return的 只接受cb的函数
// function (callback){
// args.push(callback);
// return fn.apply(this, args);
// }
//ft(1)(console.log)相当于
f(1,console.log)
复制代码
Thunkify 模块
生产环境的转换器,建议使用 Thunkify 模块。
首先是安装。
$ npm install thunkify
复制代码
使用方式如下(要连续调用两次依次传入 所需参数,保证最后传入的只有一个
)
var thunkify = require('thunkify');
var fs = require('fs');
var read = thunkify(fs.readFile);
read('package.json')(function(err, str){
// ...
});
//相当于
fs.readFile(fileName,cb)
复制代码
Thunkify的源码主要多了一个检查机制,变量called确保回调函数只运行一次。
var thunkify = require('thunkify');
function f(a, b, callback){
var sum = a + b;
callback(sum);
callback(sum);
}
var ft = thunkify(f);
var print = console.log.bind(console);
ft(1, 2)(print);
// 3 cb只执行了一次
复制代码
Generator 函数的流程管理
function* gen() {
// ...
}
var g = gen();
var res = g.next();
while(!res.done){
console.log(res.value);
res = g.next();
}
复制代码
上面代码中,Generator
函数gen
会自动执行完所有步骤,但是不适合异步操作
,无法保证前一步执行完,才能执行后一步。
var fs = require('fs');
var thunkify = require('thunkify');
var readFileThunk = thunkify(fs.readFile);
//Generator函数 的每一个yield必须是一个异步Thunk函数
var gen = function* (){
var r1 = yield readFileThunk('/etc/fstab');
console.log(r1.toString());
var r2 = yield readFileThunk('/etc/shells');
console.log(r2.toString());
};
var g = gen();
var r1 = g.next();
//r1={value:[Function],done:false}
//调用value传入回调函数 回调函数执行
r1.value(function (err, data) {
if (err) throw err;
var r2 = g.next(data);
r2.value(function (err, data) {
if (err) throw err;
g.next(data);
});
});
复制代码
仔细查看上面的代码,可以发现 Generator
函数的执行过程,其实是将同一个回调函数,反复传入next方法的value属性
。这使得我们可以用递归来自动完成这个过程。
Thunk 函数的自动流程管理
Thunk 函数真正的威力,在于可以自动执行 Generator 函数
有了这个执行器,执行 Generator
函数方便多了。不管内部有多少个异步操作,直接把Generator
函数传入run
函数即可。
前提是每一个异步操作,都要是 Thunk 函数,也就是说,跟在yield命令后面的必须是 Thunk 函数
function run(fn) {
var gen = fn();
function next(err, data) {
var result = gen.next(data);
if (result.done) return;
result.value(next);
}
next();
}
function* g() {
// ...
}
run(g);
复制代码
完整示例
//Thunk函数
var Thunk = function(fn){
return function (){
var args = Array.prototype.slice.call(arguments);
return function (callback){
args.push(callback);
return fn.apply(this, args);
}
};
};
//调用 Thunk 生成第一个return后的函数
var readFileThunk = Thunk(fs.readFile);
//函数g封装了n个异步的读取文件操作
var g = function* (){
var f1 = yield readFileThunk('fileA');
var f2 = yield readFileThunk('fileB');
// ...
var fn = yield readFileThunk('fileN');
};
//自动执行 Generator 函数
function run(fn) {
//调用Generator 函数
var gen = fn();
function next(err, data) {
var result = gen.next(data);
if (result.done) return;
result.value(next);
}
next();
}
run(g);
复制代码
上面代码的run
函数,就是一个 Generator 函数的自动执行器
。
内部的next
函数就是 Thunk
的回调函数。
next
函数先将指针移到 Generator
函数的下一步(gen.next
方法),然后判断 Generator
函数是否结束(result.done
属性),如果没结束,就将next
函数再传入 Thunk
函数(result.value
属性),否则就直接退出。
co 模块
一个小工具,用于 Generator
函数的自动执行。
下面是一个 Generator
函数,用于依次读取两个文件。
var gen = function* () {
var f1 = yield readFile('/etc/fstab');
var f2 = yield readFile('/etc/shells');
console.log(f1.toString());
console.log(f2.toString());
};
复制代码
co
模块可以让你不用编写 Generator 函数的执行器
。
var co = require('co');
co(gen);
复制代码
上面代码中,Generator
函数只要传入co
函数,就会自动执行。
co
函数返回一个Promise
对象,因此可以用then
方法添加回调函数。
co(gen).then(function (){
console.log('Generator 函数执行完成');
});
复制代码
异步完成交回执行权的实现
Generator
就是一个异步操作的容器。它的自动执行需要一种机制,当异步操作有了结果,能够自动交回执行权。
两种方法可以做到这一点。
(1)回调函数。将异步操作包装成 Thunk
函数,在回调函数里面交回执行权
(2)Promise
对象。将异步操作包装成 Promise
对象,用then
方法交回执行权
co
模块其实就是将两种自动执行器(Thunk
函数和 Promise
对象),包装成一个模块。
使用 co 的前提条件
是,Generator 函数的yield命令后面
,只能是 Thunk 函数
或 Promise 对象
。
基于 Promise 对象的自动执行
1、首先,把fs模块的readFile方法包装成一个 Promise 对象
var fs = require('fs');
var readFile = function (fileName){
return new Promise(function (resolve, reject){
fs.readFile(fileName, function(error, data){
if (error) return reject(error);
resolve(data);
});
});
};
var gen = function* (){
var f1 = yield readFile('/etc/fstab');
var f2 = yield readFile('/etc/shells');
console.log(f1.toString());
console.log(f2.toString());
};
复制代码
2、然后,手动执行上面的 Generator 函数
var g = gen();
g.next().value.then(function(data){
g.next(data).value.then(function(data){
g.next(data);
});
});
复制代码
3、根据手动执行,写出一个自动执行器。
function run(gen){
var g = gen();
function next(data){
var result = g.next(data);
//查当前是否为 Generator 函数的最后一步,如果是就返回。
if (result.done) return result.value;
//使用then方法,为返回值加上回调函数,然后再次调用next函数。
result.value.then(function(data){
next(data);
});
}
next();
}
run(gen);
复制代码
处理并发的异步操作
co
支持并发的异步操作
,即允许某些操作同时进行,等到它们全部完成,才进行下一步。
这时,要把并发的操作
都放在数组或对象里面
,跟在yield
语句后面。
// 数组的写法
co(function* () {
var res = yield [
Promise.resolve(1),
Promise.resolve(2)
];
console.log(res);
}).catch(onerror);
// 对象的写法
co(function* () {
var res = yield {
1: Promise.resolve(1),
2: Promise.resolve(2),
};
console.log(res);
}).catch(onerror);
复制代码
示例
//利用 数组循环
co(function* () {
var values = [n1, n2, n3];
yield values.map(somethingAsync);
});
function* somethingAsync(x) {
// do something async
return y
}
复制代码
上面的代码允许并发三个somethingAsync
异步操作,等到它们全部完成,才会进行下一步。
处理 Stream
Node
提供Stream
模式读写数据,特点是一次
只处理数据的一部分
,数据分成一块块依次处理
,就好像“数据流”
一样。这对于处理大规模数据
非常有利。
Stream
模式使用 EventEmitter API
,会释放三个事件:
- data事件:下一块数据块已经准备好了。
- end事件:整个“数据流”处理完了。
- error事件:发生错误。
使用Promise.race()
函数,可以判断这三个事件之中哪一个最先发生,只有当data
事件最先发生时,才进入下一个数据块的处理。从而,我们可以通过一个while
循环,完成所有数据的读取。
const co = require('co');
const fs = require('fs');
const stream = fs.createReadStream('./les_miserables.txt');
let valjeanCount = 0;
co(function*() {
while(true) {
const res = yield Promise.race([
new Promise(resolve => stream.once('data', resolve)),
new Promise(resolve => stream.once('end', resolve)),
new Promise((resolve, reject) => stream.once('error', reject))
]);
if (!res) {
break;
}
stream.removeAllListeners('data');
stream.removeAllListeners('end');
stream.removeAllListeners('error');
valjeanCount += (res.toString().match(/valjean/ig) || []).length;
}
console.log('count:', valjeanCount); // count: 1120
});
复制代码
上面代码采用 Stream
模式读取《悲惨世界》的文本文件,对于每个数据块都使用stream.once
方法,在data、end、error
三个事件上添加一次性回调函数。变量res
只有在data
事件发生时才有值,然后累加每个数据块之中valjean
这个词出现的次数。