Step源码解读_Node异步流程控制2

一.基本用法

Step最大的特色是只提供了1个API,用法看一眼就能记住,示例如下:

var fs = require('fs');

var Step = require('./step.js');

Step(
    function readDir() {
        fs.readdir(__dirname, this);
    },
    function readFiles(err, results) {
        if (err) throw err;
        // Create a new group
        var group = this.group();
        results.forEach(function(filename) {
            if (/\.js$/.test(filename)) {
                fs.readFile(__dirname + "/" + filename, 'utf8', group());
            }
        });
    },
    function showAll(err, files) {
        if (err) throw err;
        files.forEach(function(text) {
            console.log(text.slice(0, 20));
        });
        // console.dir(files);
    }
);

先读当前目录,再并发读取所有文件。特别简洁,像是一件小巧的艺术品:

  • Step()接受任意个函数表示顺序执行

  • 各函数中的this表示内置回调函数,收集1个结果并传递到下一个任务

  • 各函数中的this.group()表示结果分组,用来支持并发请求,收集多个结果再传递到下一个任务

二.实现思路

1.顺序执行

Step()不是构造函数,只是一个普通函数。暴露出来的唯一API,用来注册需要顺序执行的任务

Step内部收集传入的各个任务,控制顺序执行,并通过thisthis.group()注入结果收集逻辑,例如:

Step(function() {
    var _callback = this;
    setTimeout(function() {
        _callback(null, 1);
    }, 50);
    setTimeout(function() {
        _callback(null, 2, 4);
    }, 30);
}, function(err, res) {
    if (err) throw err;
    console.log(res);
});

this_callback)收集结果并传入下一个任务,res接到2。存在2个问题:

  • 4丢了

  • 1没有输出

4丢了是因为Step限定每个任务的返回结果都是1个值,所以每个任务的第一个参数是异常err,第二个是上一个任务的返回结果res

1没有输出是因为this()表示传递当前任务结果给下一环,标志着当前任务结束,因此后来的重复this()无效。更进一步,正因为this无法收集多个结果,所以需要this.group()创建分组来处理并发情况

2.并发请求

为了处理并发请求,专门添了一个group分组方法,观察之前的示例:

// Create a new group
var group = this.group();
results.forEach(function(filename) {
    if (/\.js$/.test(filename)) {
        fs.readFile(__dirname + "/" + filename, 'utf8', group());
    }
});

可以发现,利用分组收集多个结果有3个步骤:

  1. this.group()初始化一个group,准备接收多个结果

  2. group()注册并发请求

  3. group()()收集结果,并移除对应的注册项,等待所有注册项都完成后把结果数组传给下一环

具体实现方法类似于EventProxy中的all(),内置计数器按顺序记录结果,最后把收集到的结果数组传递下去

3.异常处理

Step的异常处理比较丑,示例如下:

Step(function() {
    //...
}, function(err, res) {
    if (err) throw err;
    //...
}, function(err, res) {
    if (err) throw err;
    //...
});

除第一环外,后续所有任务内都要人工置顶if...throw。也就是说,异常处理机制要求手动后抛err,否则异常无法传入最后的回调函数(最后一环)。如下:

Step(function() {
    throw new Error('error occurs');
}, function(err) {
    // if (err) throw err;
    this(null, 'ok');
}, function(err, res) {
    if (!err) {
        console.log(res);   // ok
    }
});

第一环的异常在第二环弄丢了,第三环没有收到异常。丢个异常没什么,但不会报错有点吓人,也就是说,如果忘记了人工置顶if...throw的话,代码可能静默出错,这很危险

丢失异常的根本原因是每一个环节执行时都被包在try块里,如有异常则传入下一环,通过参数传入的异常不手动再抛自然就丢失了

三.源码分析

基本结构如下:

// 1.定义Step()接收一系列任务
    // 1.1分离任务(steps)和回调(最后一环)
    // 1.2初始化各个状态变量
// 2.定义next()作为内置的回调函数(供外部使用)
    // 2.1从任务队列中取出一个并放在try块里执行
    //    同时通过this注入next回调
    // 2.2收集并传递结果/异常,执行下一环
// 3.定义next.parallel(),仅供内部使用,配合group
    // 3.1计数器加加
    // 3.2返回负责计数器减减的callback
// 4.定义next.group(),供外部使用
    // 4.1初始化计数器及状态变量
    // 4.2返回负责收集结果的callback
// 5.定义Step.fn(),延长管道(支持添加第一环leading数据准备和额外最后一环tailing收尾)
    // 5.1记录参数传入的各个任务
    // 5.2返回新函数用来接收leading数据和最后的tailing
    //    然后添上第一环和最后一环,并启动Step
// 6.暴露出API
    // 6.1按照CommonJS模块定义的方式暴露出Step()

第1步定义入口函数,第2步提供任务顺序执行支持以及异常处理,第3和4步提供并发请求支持,第5步提供数据导入支持和收尾支持,锦上添花

具体源码(附详细注释):

/*
Copyright (c) 2011 Tim Caswell <tim@creationix.com>

MIT License

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/

// Inspired by http://github.com/willconant/flow-js, but reimplemented and
// modified to fit my taste and the node.JS error handling system.

// 1.定义Step()接收一系列任务
    // 1.1分离任务(steps)和回调(最后一环)
    // 1.2初始化各个状态变量
// 2.定义next()作为内置的回调函数(供外部使用)
    // 2.1从任务队列中取出一个并放在try块里执行
    //    同时通过this注入next回调
    // 2.2收集并传递结果/异常,执行下一环
// 3.定义next.parallel(),仅供内部使用,配合group
    // 3.1计数器加加
    // 3.2返回负责计数器减减的callback
// 4.定义next.group(),供外部使用
    // 4.1初始化计数器及状态变量
    // 4.2返回负责收集结果的callback
// 5.定义Step.fn(),延长管道(支持添加第一环leading数据准备和额外最后一环tailing收尾)
    // 5.1记录参数传入的各个任务
    // 5.2返回新函数用来接收leading数据和最后的tailing
    //    然后添上第一环和最后一环,并启动Step
// 6.暴露出API
    // 6.1按照CommonJS模块定义的方式暴露出Step()
function Step() {
  var steps = Array.prototype.slice.call(arguments),
      pending, counter, results, lock;
  // steps是调用者传入的一系列函数,最后一个是回调,之前的都是异步/同步任务
  // pending表示待执行项数
  // counter表示并行执行的总项数
  // results表示收集到的并行执行的steps的结果,results[0]为err,再往后与各step对应
  // lock表示正在处理/执行下一个step

  // Define the main callback that's given as `this` to the steps.
  // 主回调函数,即steps中的this
  function next() {
    counter = pending = 0;

    // Check if there are no steps left
    if (steps.length === 0) {
      // Throw uncaught errors
      if (arguments[0]) {
        throw arguments[0];
      }
      return;
    }

    // Get the next step to execute
    // 从头取一个step
    var fn = steps.shift();
    results = [];

    // Run the step in a try..catch block so exceptions don't get out of hand.
    // steps的执行被放到了try中,未捕获的异常通过next(e)传递给下一个step
    try {
      lock = true;
      //!!! steps中,this指向next的原因
      var result = fn.apply(next, arguments);
    } catch (e) {
      // Pass any exceptions on through the next callback
      next(e);
    }

    if (counter > 0 && pending == 0) {
      // If parallel() was called, and all parallel branches executed
      // synchronously, go on to the next step immediately.
      // 控制steps并行执行
      next.apply(null, results);
    } else if (result !== undefined) {
      // If a synchronous return is used, pass it to the callback
      // 把同步step的返回值传入下一个step
      next(undefined, result);
    }

    lock = false;
  }

  // Add a special callback generator `this.parallel()` that groups stuff.
  next.parallel = function () {
    // 每次调用更新计数器与待执行项数
    var index = 1 + counter++;  // 第一个空位留给err,结果从results[1]开始往后放
    pending++;

    return function () {
      // 回调执行时,更新待执行项数
      pending--;
      // Compress the error from any result to the first argument
      if (arguments[0]) {
        results[0] = arguments[0];
      }
      // Send the other results as arguments
      results[index] = arguments[1];
      //!!! 丢弃第3个参数及之后的所有参数
      if (!lock && pending === 0) {
        // When all parallel branches done, call the callback
        // 没有待执行项了,执行下一个step
        next.apply(null, results);
      }
else {
    console.warn('pending ' + pending + ' lock ' + lock);///
}
    };
  };

  // Generates a callback generator for grouped results
  next.group = function () {
    var localCallback = next.parallel();
    var counter = 0;
    var pending = 0;
    var result = [];
    var error = undefined;

    function check() {
console.log('from ' + arguments[0] + ' pending ' + pending);///
      if (pending === 0) {
        // When group is done, call the callback
        localCallback(error, result);
      }
    }
    //! 避免因为this.group()之后无group()调用而无法触发最后一个回调的情况
    process.nextTick(check); // Ensures that check is called at least once

    // Generates a callback for the group
    return function () {
      // 类似于parallel,更新计数器和待执行项数
      var index = counter++;
      pending++;
      return function () {
        pending--;
        // Compress the error from any result to the first argument
        if (arguments[0]) {
          error = arguments[0];
        }
        // Send the other results as arguments
        result[index] = arguments[1];
        // if (!lock) { check(); }
if (!lock) { check('tail'); }///
      };
    };
  };

  // Start the engine an pass nothing to the first step.
  // 仅仅为了初始化counter和padding
  next();
}

// Tack on leading and tailing steps for input and output and return
// the whole thing as a function.  Basically turns step calls into function
// factories.
Step.fn = function StepFn() {
  var steps = Array.prototype.slice.call(arguments);

  // 返回的函数会把接收到的最后一个函数参数作为额外的最后一环
  // 其余的作为参数传入第一环
  return function () {
    var args = Array.prototype.slice.call(arguments);

    // Insert a first step that primes the data stream
    // 插入一步数据流准备
    var toRun = [function () {
      this.apply(null, args);
    }].concat(steps);

    // If the last arg is a function add it as a last step
    // 即tailing hook的实现
    if (typeof args[args.length-1] === 'function') {
      toRun.push(args.pop());
    }


    Step.apply(null, toRun);
  }
}


// Hook into commonJS module systems
if (typeof module !== 'undefined' && "exports" in module) {
  module.exports = Step;
}

所有细节都在注释里,需要注意的是nextTick check的问题,直觉感觉有错,github也有朋友提出了相同的疑问,但经过多次测试确认nextTick check一般情况下没问题(测试中没有找到二般情况用例),而且该库已经经过相当长的时间检验了,姑且可以放心使用

Step没有继续发展“壮大”,决定保持其小巧精致,但对于Step存在的缺陷,原作者有一些新的改进想法,提出了twostepsafereturn,有兴趣可以观摩研究,原作者战斗力很高,观摩稳赚不赔

P.S.原作者对于异步控制库的设计理念是:

The general idea is to not impose structure on the programmer, but rather help them write good callback based functions.

让大家用得轻松,而不是强加一堆结构让人去习惯

四.总结

Step更像是个魔术,库还能这么设计?

撇开小巧简单等优点不谈,Step存在以下问题:

  • this指向内部函数,导致各个step中嵌套函数不便使用(that = this或者使用ES6箭头函数)

  • 错误处理机制要求每个step以if...throw开始,否则错误无法通过中间step传递至最后的callback

  • 状态丢弃机制,注定无法应对复杂依赖的情况

this的问题其实没有想象的那么严重,错误处理要求人工置顶if...throw确实有点难受,状态丢弃机制必然不适用于复杂情况,但足够应付大多数场景了

参考资料

  • 《深入浅出NodeJS》

发表评论

电子邮件地址不会被公开。 必填项已用*标注

*

code