ES6(13)Generator的异步应用

基本概念

协程:多个线程互相协作,完成异步任务

协程有点像函数,又有点像线程。它的运行流程大致如下:

第一步,协程A开始执行。

第二步,协程A执行到一半,进入暂停,执行权转移到协程B。

第三步,(一段时间后)协程B交还执行权。

第四步,协程A恢复执行。

上面流程的协程A,就是异步任务,因为它分成两段(或多段)执行

function* asyncJob() {
  // ...其他代码
  var f = yield readFile(fileA);
  // ...其他代码
}
复制代码

协程遇到yield命令就暂停,等到执行权返回,再从暂停的地方继续往后执行。它的最大优点,就是代码的写法非常像同步操作,如果去除yield命令,简直一模一样。

异步任务的封装

虽然 Generator 函数将异步操作表示得很简洁,但是流程管理却不方便(即何时执行第一阶段、何时执行第二阶段)。

var fetch = require('node-fetch');

function* gen(){
  var url = 'https://api.github.com/users/github';
  var result = yield fetch(url);
  console.log(result.bio);
}

var g = gen();
var result = g.next();

result.value.then(function(data){
  return data.json();
}).then(function(data){
  g.next(data);
});
复制代码

首先执行 Generator 函数,获取遍历器对象,然后使用next方法(第二行),执行异步任务的第一阶段。由于Fetch模块返回的是一个 Promise 对象,因此要用then方法调用下一个next方法。

Thunk 函数是自动执行 Generator 函数的一种方法

“传值调用”(call by value)

即在进入函数体之前,就计算x + 5的值(等于 6),再将这个值传入函数f。C 语言就采用这种策略。

var x = 1;

function f(m) {
  return m * 2;
}

f(x + 5)
// 传值调用时,等同于
f(6)
复制代码

“传名调用”(call by name)

即直接将表达式x + 5传入函数体,只在用到它的时候求值。Haskell 语言采用这种策略。

var x = 1;

function f(m) {
  return m * 2;
}

f(x + 5)
// 传名调用时,等同于
(x + 5) * 2
复制代码

Thunk 函数的含义 ,“传名调用”的一种实现策略

编译器的“传名调用”实现,往往是将参数放到一个临时函数之中,再将这个临时函数传入函数体。这个临时函数就叫做 Thunk 函数。

Thunk 函数是“传名调用”的一种实现策略,用来替换某个表达式。

function f(m) {
  return m * 2;
}

f(x + 5);

// 等同于

var thunk = function () {
  return x + 5;
};

function f(thunk) {
  return thunk() * 2;
}
复制代码

JavaScript 语言的 Thunk 函数

1、JavaScript 语言是传值调用,它的 Thunk 函数含义有所不同

2、在JavaScript语言中,Thunk 函数替换的不是表达式,而是多参数函数,将其替换成一个只接受回调函数作为参数单参数函数

3、js的Thunk:相当于把需要传递多个参数的函数(其中一个是cb),分为三次执行

第一次执行Thunk接收要执行的函数(fn)返回一个函数
第二次执行传递除cb以外的参数(…args)
第三次执行只传递cb
// 正常版本的readFile(多参数版本)
fs.readFile(fileName, callback);

// Thunk版本的readFile(单参数版本)
var Thunk = function (fileName) {
  return function (callback) {
    return fs.readFile(fileName, callback);
  };
};

//调用Thunk 返回只接受cb的函数
var readFileThunk = Thunk(fileName);

//再次调用 只需要传入cb
readFileThunk(callback);
复制代码

Trunk函数封装

任何函数,只要参数有回调函数,就能写成 Thunk 函数的形式
// ES5版本
var Thunk = function(fn){
  return function (){
    var args = Array.prototype.slice.call(arguments);
    return function (callback){
      args.push(callback);
      return fn.apply(this, args);
    }
  };
};

// ES6版本
const Thunk = function(fn) {
  return function (...args) {
    return function (callback) {
      //因为aegs是一个数组  所以call的时候需要解构赋值
      return fn.call(this, ...args, callback);
      //或者
      //args.push(callback)
      //return fn.apply(this, args);
    }
  };
};

var readFileThunk = Thunk(fs.readFile);
readFileThunk(fileA)(callback);
复制代码
示例
const Thunk = function(fn) {
  return function (...args) {
    return function (callback) {
      //因为aegs是一个数组  所以call的时候需要解构赋值
      return fn.call(this, ...args, callback);
      //或者
      //args.push(callback)
      //return fn.apply(this, args);
    }
  };
};

function f(a, cb) {
  cb(a);
}
const ft = Thunk(f);	//形成不销毁的作用域
// ft就是第一个return 的函数
// function (){
//     var args = Array.prototype.slice.call(arguments);
//     console.log(args);
//     return function (callback){
//         args.push(callback);
//         return fn.apply(this, args);
//     }
// };

ft(1)(console.log)  
// 1
//ft(1)得到第二个return的 只接受cb的函数
// function (callback){
//     args.push(callback);
//     return fn.apply(this, args);
// }

//ft(1)(console.log)相当于
f(1,console.log)

复制代码

Thunkify 模块

生产环境的转换器,建议使用 Thunkify 模块。

首先是安装。

$ npm install thunkify
复制代码

使用方式如下(要连续调用两次依次传入 所需参数,保证最后传入的只有一个

var thunkify = require('thunkify');
var fs = require('fs');

var read = thunkify(fs.readFile);
read('package.json')(function(err, str){
  // ...
});

//相当于
fs.readFile(fileName,cb)
复制代码

Thunkify的源码主要多了一个检查机制,变量called确保回调函数只运行一次。

var thunkify = require('thunkify');
function f(a, b, callback){
  var sum = a + b;
  callback(sum);
  callback(sum);
}

var ft = thunkify(f);
var print = console.log.bind(console);
ft(1, 2)(print);
// 3  cb只执行了一次
复制代码

Generator 函数的流程管理

function* gen() {
  // ...
}

var g = gen();
var res = g.next();

while(!res.done){
  console.log(res.value);
  res = g.next();
}
复制代码

上面代码中,Generator 函数gen会自动执行完所有步骤,但是不适合异步操作,无法保证前一步执行完,才能执行后一步。

var fs = require('fs');
var thunkify = require('thunkify');
var readFileThunk = thunkify(fs.readFile);

//Generator函数 的每一个yield必须是一个异步Thunk函数
var gen = function* (){
  var r1 = yield readFileThunk('/etc/fstab');
  console.log(r1.toString());
  var r2 = yield readFileThunk('/etc/shells');
  console.log(r2.toString());
};

var g = gen();

var r1 = g.next();
//r1={value:[Function],done:false}

//调用value传入回调函数  回调函数执行
r1.value(function (err, data) {
  if (err) throw err;
  var r2 = g.next(data);
  r2.value(function (err, data) {
    if (err) throw err;
    g.next(data);
  });
});
复制代码

仔细查看上面的代码,可以发现 Generator 函数的执行过程,其实是将同一个回调函数,反复传入next方法的value属性。这使得我们可以用递归来自动完成这个过程。

Thunk 函数的自动流程管理

Thunk 函数真正的威力,在于可以自动执行 Generator 函数

有了这个执行器,执行 Generator 函数方便多了。不管内部有多少个异步操作,直接把Generator函数传入run函数即可。

前提是每一个异步操作,都要是 Thunk 函数,也就是说,跟在yield命令后面的必须是 Thunk 函数

function run(fn) {
  var gen = fn();

  function next(err, data) {
    var result = gen.next(data);
    if (result.done) return;
    result.value(next);
  }

  next();
}

function* g() {
  // ...
}

run(g);
复制代码

完整示例

//Thunk函数
var Thunk = function(fn){
  return function (){
    var args = Array.prototype.slice.call(arguments);
    return function (callback){
      args.push(callback);
      return fn.apply(this, args);
    }
  };
};

//调用 Thunk  生成第一个return后的函数
var readFileThunk = Thunk(fs.readFile);

//函数g封装了n个异步的读取文件操作
var g = function* (){
  var f1 = yield readFileThunk('fileA');
  var f2 = yield readFileThunk('fileB');
  // ...
  var fn = yield readFileThunk('fileN');
};

//自动执行 Generator 函数
function run(fn) {
	//调用Generator 函数
  var gen = fn();

  function next(err, data) {
    var result = gen.next(data);
    if (result.done) return;
    result.value(next);
  }

  next();
}

run(g);
复制代码

上面代码的run函数,就是一个 Generator 函数的自动执行器

内部的next函数就是 Thunk 的回调函数。

next函数先将指针移到 Generator 函数的下一步(gen.next方法),然后判断 Generator 函数是否结束(result.done属性),如果没结束,就将next函数再传入 Thunk 函数(result.value属性),否则就直接退出。

co 模块

一个小工具,用于 Generator 函数的自动执行。

下面是一个 Generator 函数,用于依次读取两个文件。

var gen = function* () {
  var f1 = yield readFile('/etc/fstab');
  var f2 = yield readFile('/etc/shells');
  console.log(f1.toString());
  console.log(f2.toString());
};
复制代码

co 模块可以让你不用编写 Generator 函数的执行器

var co = require('co');
co(gen);
复制代码

上面代码中,Generator 函数只要传入co函数,就会自动执行。

co函数返回一个Promise对象,因此可以用then方法添加回调函数。

co(gen).then(function (){
  console.log('Generator 函数执行完成');
});
复制代码

异步完成交回执行权的实现

Generator 就是一个异步操作的容器。它的自动执行需要一种机制,当异步操作有了结果,能够自动交回执行权。
两种方法可以做到这一点。

(1)回调函数。将异步操作包装成 Thunk 函数,在回调函数里面交回执行权

(2)Promise 对象。将异步操作包装成 Promise 对象,用then方法交回执行权

co 模块其实就是将两种自动执行器(Thunk 函数和 Promise 对象),包装成一个模块。

使用 co 的前提条件是,Generator 函数的yield命令后面,只能是 Thunk 函数 Promise 对象

基于 Promise 对象的自动执行

1、首先,把fs模块的readFile方法包装成一个 Promise 对象

var fs = require('fs');

var readFile = function (fileName){
  return new Promise(function (resolve, reject){
    fs.readFile(fileName, function(error, data){
      if (error) return reject(error);
      resolve(data);
    });
  });
};

var gen = function* (){
  var f1 = yield readFile('/etc/fstab');
  var f2 = yield readFile('/etc/shells');
  console.log(f1.toString());
  console.log(f2.toString());
};
复制代码

2、然后,手动执行上面的 Generator 函数

var g = gen();

g.next().value.then(function(data){
  g.next(data).value.then(function(data){
    g.next(data);
  });
});
复制代码

3、根据手动执行,写出一个自动执行器。

function run(gen){
  var g = gen();

  function next(data){
    var result = g.next(data);
    //查当前是否为 Generator 函数的最后一步,如果是就返回。
    if (result.done) return result.value;
    //使用then方法,为返回值加上回调函数,然后再次调用next函数。
    result.value.then(function(data){
      next(data);
    });
  }

  next();
}

run(gen);
复制代码

处理并发的异步操作

co 支持并发的异步操作,即允许某些操作同时进行,等到它们全部完成,才进行下一步。

这时,要把并发的操作都放在数组或对象里面,跟在yield语句后面。

// 数组的写法
co(function* () {
  var res = yield [
    Promise.resolve(1),
    Promise.resolve(2)
  ];
  console.log(res);
}).catch(onerror);

// 对象的写法
co(function* () {
  var res = yield {
    1: Promise.resolve(1),
    2: Promise.resolve(2),
  };
  console.log(res);
}).catch(onerror);
复制代码

示例

//利用 数组循环
co(function* () {
  var values = [n1, n2, n3];
  yield values.map(somethingAsync);
});

function* somethingAsync(x) {
  // do something async
  return y
}
复制代码

上面的代码允许并发三个somethingAsync异步操作,等到它们全部完成,才会进行下一步。

处理 Stream

Node 提供 Stream 模式读写数据,特点是一次只处理数据的一部分,数据分成一块块依次处理,就好像“数据流”一样。这对于处理大规模数据非常有利。

Stream 模式使用 EventEmitter API,会释放三个事件:

  • data事件:下一块数据块已经准备好了。
  • end事件:整个“数据流”处理完了。
  • error事件:发生错误。

使用Promise.race()函数,可以判断这三个事件之中哪一个最先发生,只有当data事件最先发生时,才进入下一个数据块的处理。从而,我们可以通过一个while循环,完成所有数据的读取。

const co = require('co');
const fs = require('fs');

const stream = fs.createReadStream('./les_miserables.txt');
let valjeanCount = 0;

co(function*() {
  while(true) {
    const res = yield Promise.race([
      new Promise(resolve => stream.once('data', resolve)),
      new Promise(resolve => stream.once('end', resolve)),
      new Promise((resolve, reject) => stream.once('error', reject))
    ]);
    if (!res) {
      break;
    }
    stream.removeAllListeners('data');
    stream.removeAllListeners('end');
    stream.removeAllListeners('error');
    valjeanCount += (res.toString().match(/valjean/ig) || []).length;
  }
  console.log('count:', valjeanCount); // count: 1120
});
复制代码

上面代码采用 Stream 模式读取《悲惨世界》的文本文件,对于每个数据块都使用stream.once方法,在data、end、error三个事件上添加一次性回调函数。变量res只有在data事件发生时才有值,然后累加每个数据块之中valjean这个词出现的次数。

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享