`

async异步流程控制模块源码

    博客分类:
  • node
 
阅读更多

 Collections 集合处理

async.forEachOf  |  eachOf(object, iterator, callback)

  • 实现功能:遍历object对象执行iterator,报错或遍历执行完成时调用callback(error);callback(error)函数的触发时机需要手动在iterator中设置。
  • 源码解读:_keyIterator函数借用闭包遍历对象或数组(闭包中缓存数组的index值、或对象属性集合的index值),执行时获取不同的index值展开遍历,遍历终止时返回null。iterator若为异步延迟函数,每次启动执行时completed+1,延迟完成调用only_once(done)函数时completed-1,completed在源码中发挥作用没那么明确。iterator函数的尾参only_once(done)通过内部函数封装后输出给使用者,实现类似Promise模块将resolve、reject函数封装后输出给模块的调用者。callback函数在内部的执行条件是在外部调用过程中携带error错误,或者key=null、completed=0,即遍历执行完毕。
async.forEachOf =
    async.eachOf = function (object, iterator, callback) {
        callback = _once(callback || noop);
        object = object || [];

        var iter = _keyIterator(object);
        var key, completed = 0;

        while ((key = iter()) != null) {// 反复执行闭包函数iter遍历对象
            completed += 1;
            // only_once(done)中调用外部函数callback,机理同Promise-resolve相似
            // 内部函数对外部函数的影响是参数
            iterator(object[key], key, only_once(done));
        }

        if (completed === 0) callback(null);

        function done(err) {
            completed--;
            if (err) {
                callback(err);
            }
            // Check key is null in case iterator isn't exhausted
            // and done resolved synchronously.
            else if (key === null && completed <= 0) {
                callback(null);
            }
        }
    };

 

  • 主要问题:报错不影响后续iterator函数执行,参看async.some方法实现的中断回调的执行,捕获到err时,生成status=false状态,根据status状态调用回调函数callback。参看async.applyEach针对数组元素为函数的特殊情况,函数参数相同。
  • 官方示例:遍历读取json文件,报错或单次读取操作完成时执行callback回调,区别是携带参数为err或null。
var obj = {dev: "/dev.json", test: "/test.json", prod: "/prod.json"};
var configs = {};

async.forEachOf(obj, function (value, key, callback) {
    fs.readFile(__dirname + value, "utf8", function (err, data) {
        if (err) return callback(err);// 报错时执行callback
        try {
            configs[key] = JSON.parse(data);
        } catch (e) {
            return callback(e);
        }
        callback();// 执行完成时调用function(err)
    });
}, function (err) {
    if (err) console.error(err.message);
    // configs is now a map of JSON data
    doSomethingWith(configs);
});

 

async.forEach  |  each(arr, iterator, callback)

  • 实现功能:实现功能同async.eachOf,只是iterator函数中没有数组的index值,或者没有对象属性集合的index值。
  • 源码解读:_without函数用于去除iterator函数携带的数组index值参数。
async.forEach =
async.each = function (arr, iterator, callback) {
     return async.eachOf(arr, _withoutIndex(iterator), callback);
};
 
  • 官方示例:遍历文件信息。
async.each(openFiles, function(file, callback) {

    // Perform operation on file here.
    console.log('Processing file ' + file);

    if( file.length > 32 ) {
      console.log('This file name is too long');
      callback('File name too long');
    } else {
      // Do work to process file here
      console.log('File processed');
      callback();
    }
}, function(err) {
    // if any of the file processing produced an error, err would equal that error
    if( err ) {
      // One of the iterations produced an error.
      // All processing will now stop.
      console.log('A file failed to process');
    } else {
      console.log('All files have been processed successfully');
    }
});

 

 async.forEachOfSeries |  eachOfSeries(obj, iterator, callback)

  •  实现功能:async.eachOf对obj遍历调用的iterator函数有延迟时,同步执行该延迟。eachOfSeries方法则在一个延迟函数iterator完成后再调用下一个延迟函数iterator,实现依赖于使用者调用尾参only_once(done)函数的时机。和async.eachOf方法不同的另一点是,callback回调触发时机为捕获到错误或者遍历执行完毕时。
  • 源码解读:通过_keyIterator函数控制遍历对象或数组的节奏,在延迟执行完成后获取下一元素项执行iterator函数,或者通过完整遍历依次获取元素项执行iterator函数(async.eachOf方法中实现),通常的each方法不能做到。
async.forEachOfSeries =
    async.eachOfSeries = function (obj, iterator, callback) {
        callback = _once(callback || noop);
        obj = obj || [];
        var nextKey = _keyIterator(obj);
        var key = nextKey();
        function iterate() {
            var sync = true;
            if (key === null) {
                return callback(null);
            }
            iterator(obj[key], key, only_once(function (err) {
                if (err) {
                    callback(err);
                }
                else {
                    key = nextKey();
                    if (key === null) {
                        return callback(null);
                    } else {
                        if (sync) {
                            async.setImmediate(iterate);
                        } else {
                            iterate();
                        }
                    }
                }
            }));
            sync = false;
        }
        iterate();
    };

 

  • 示例:阻塞式读取文件,遍历完成或报错时打印错误消息,终止遍历。
var obj = {dev: "/dev.json", test: "/test.json", prod: "/prod.json"};
var configs = {};

async.forEachOf(obj, function (value, key, callback) {
    fs.readFile(__dirname + value, "utf8", function (err, data) {
        if (err) return callback(err);
        try {
            configs[key] = JSON.parse(data);
            callback(); // 启动下一次读取
        } catch (e) {
            return callback(e);
        }
    });
}, function (err) {
    if (err) console.error(err.message);
    // configs is now a map of JSON data
    doSomethingWith(configs);
});

 

 async.forEachSeries |  eachSeries(arr, iterator, callback)

  • 实现功能:实现功能同async.eachOfSeries,只是iterator函数中没有数组的index值,或者没有对象属性集合的index值。
  • 源码解读:_without函数用于去除iterator函数携带的数组index值参数。
async.forEachSeries =
async.eachSeries = function (arr, iterator, callback) {
    return async.eachOfSeries(arr, _withoutIndex(iterator), callback);
};

 

  async.forEachOfLimit  |  eachOfLimit(obj, iterator, callback)

  •  实现功能:当limit=1时,eachOfLimit方法同async.eachOfSeries相似,也是上一次延时执行完成后再执行下一次延时函数,callback回调的触发时机也是报错或遍历完成后。当limit=obj.lenth(数组的长度,或对象属性集合的长度)时,eachOfLimit方法同async.eachOf相似,延时函数iterator同步执行,callback回调的触发时机是报错或遍历完成,在延时过程中报错,没法阻止下一个iterator函数的执行。limit在1和obj.length之间,limit个数的iterator同步启动,当一个延时执行完成,加载下一个iterator函数。iterator同时执行个数限制为limit。
  • 源码解读:
async.forEachOfLimit =
    async.eachOfLimit = function (obj, limit, iterator, callback) {
        _eachOfLimit(limit)(obj, iterator, callback);
    };

    function _eachOfLimit(limit) {

        return function (obj, iterator, callback) {
            callback = _once(callback || noop);
            obj = obj || [];
            var nextKey = _keyIterator(obj);
            if (limit <= 0) {
                return callback(null);
            }
            var done = false;
            var running = 0;// 记录iterator执行个数,iterator启动前+1,执行完成后-1
            var errored = false;

            (function replenish () {
                if (done && running <= 0) {
                    return callback(null);
                }

                while (running < limit && !errored) {
                    var key = nextKey();
                    if (key === null) {
                        done = true;
                        if (running <= 0) {
                            callback(null);
                        }
                        return;
                    }
                    running += 1;
                    iterator(obj[key], key, only_once(function (err) {
                        running -= 1;
                        if (err) {
                            callback(err);
                            errored = true;
                        }
                        else {
                            replenish();
                        }
                    }));
                }
            })();
        };
    }

 

 async.forEachLimit  |  eachLimit(arr, iterator, callback)

  • 实现功能:实现功能同async.eachOfLimit,只是iterator函数中没有数组的index值,或者没有对象属性集合的index值。
  • 源码解读:_without函数用于去除iterator函数携带的数组index值参数。
async.forEachLimit =
    async.eachLimit = function (arr, limit, iterator, callback) {
        return _eachOfLimit(limit)(arr, _withoutIndex(iterator), callback);
    };

 

  async.map  |  mapSeries  |  mapLimit(arr, iterator, callback)

  •  实现功能:async.map调用async.eachOf方法(延时函数同时执行),async.mapSeries调用async.eachOfSeries方法(延时函数逐个执行),async.mapLimit调用async.eachOfLimit方法(限制延时函数同时执行的个数)。三者均遍历obj执行iterator函数,主要目的是获取处理后的最终值results,并传给最终的回调函数callback进行处理。
  • 源码:
function doParallel(fn) {
    return function (obj, iterator, callback) {
        return fn(async.eachOf, obj, iterator, callback);
    };
}
function doParallelLimit(fn) {
    return function (obj, limit, iterator, callback) {
        return fn(_eachOfLimit(limit), obj, iterator, callback);
    };
}
function doSeries(fn) {
    return function (obj, iterator, callback) {
        return fn(async.eachOfSeries, obj, iterator, callback);
    };
}

function _asyncMap(eachfn, arr, iterator, callback) {
    callback = _once(callback || noop);
    arr = arr || [];
    var results = _isArrayLike(arr) ? [] : {};
    eachfn(arr, function (value, index, callback) {
        iterator(value, function (err, v) {
            results[index] = v;
            callback(err);
        });
    }, function (err) {
        callback(err, results);
    });
}

async.map = doParallel(_asyncMap);
async.mapSeries = doSeries(_asyncMap);
async.mapLimit = doParallelLimit(_asyncMap);

 

  •  源码:iterator加工获得results,并传给callback作后续处理
async.map(['file1','file2','file3'], fs.stat, function(err, results) {
    // results is now an array of stats for each file
});

 

async.inject  |  foldl  |  reduce(arr, memo, iterator, callback)

  •  实现功能:调用async.eachOfSeries方法(延时函数逐个执行),三者均遍历obj执行iterator函数,主要目的是处理并更新memo,并传给最终的回调函数callback进行处理。
  • 源码:
async.inject =
    async.foldl =
    async.reduce = function (arr, memo, iterator, callback) {
        async.eachOfSeries(arr, function (x, i, callback) {
            iterator(memo, x, function (err, v) {
                memo = v;// 更新memo
                callback(err);// 错误处理
            });
        }, function (err) {
            callback(err, memo);//memo不是通过传值或者闭包驻留,由上级作用域赋予
        });
    };

 

  •  官方示例
async.reduce([1,2,3], 0, function(memo, item, callback) {
    // pointless async:
    process.nextTick(function() {
        callback(null, memo + item) // 更新memo等
    });
}, function(err, result) {
    // result is now equal to the last value of memo, which is 6
});

 

async.foldr  |  reduceRight(arr, memo, iterator, callback)

  • 实现功能:同async.reduce相似,区别是自右向左遍历。
  • 源码解读:
async.foldr =
    async.reduceRight = function (arr, memo, iterator, callback) {
        var reversed = _map(arr, identity).reverse();
        async.reduce(reversed, memo, iterator, callback);
    };

 

 async.transform(arr, memo, iterator, callback)

  •  实现功能:同async.reduce方法相似,transform方法在每次延时执行完成后处理memo,等到各延时均执行完成后,调用callback获取并处理memo。transform方法内部调用async.eachOf方法,因此各延时函数近乎同一时间执行,非阻塞式实现。
  • 源码解读:
async.transform = function (arr, memo, iterator, callback) {
        if (arguments.length === 3) {
            callback = iterator;
            iterator = memo;
            memo = _isArray(arr) ? [] : {};
        }

        async.eachOf(arr, function(v, k, cb) {
            iterator(memo, v, k, cb);
        }, function(err) {
            callback(err, memo);
        });
    };

 

  • 官方示例:
async.transform([1,2,3], function(acc, item, index, callback) {
    // pointless async:
    process.nextTick(function() {
        acc.push(item * 2)
        callback(null)
    });
}, function(err, result) {
    // result is now equal to [2, 4, 6]
});

 

async.transform({a: 1, b: 2, c: 3}, function (obj, val, key, callback) {
    setImmediate(function () {
        obj[key] = val * 2;
        callback();
    })
}, function (err, result) {
    // result is equal to {a: 2, b: 4, c: 6}
})

 

   async.select | filter |  selectSeries | filterSeries  |  selectLimit | filterLimit(arr, iterator, callback)

  •  实现功能:async.filter调用async.eachOf方法(延时函数同时执行),async.filterSeries调用async.eachOfSeries方法(延时函数逐个执行),async.filterLimit调用async.eachOfLimit方法(限制延时函数同时执行的个数)。三者均遍历arr执行iterator函数,主要目的是iterator回调传参v过滤arr,arr的最终值是由原来的元素项构成的数组形式,arr最初为对象时,最终值由对象的键值构成数组,并将该最终值传给最终的回调函数callback进行处理。
  • 源码解读:
function _filter(eachfn, arr, iterator, callback) {
        var results = [];
        eachfn(arr, function (x, index, callback) {
            iterator(x, function (v) {
                if (v) {
                    results.push({index: index, value: x});
                }
                callback();
            });
        }, function () {
            callback(_map(results.sort(function (a, b) { // 由回调函数重置数组的元素项为results的value键值
                return a.index - b.index;
            }), function (x) {
                return x.value;
            }));
        });
    }

    // 调用async.eachOf方法遍历obj执行iterator,通过iterator的回调传参v过滤,返回数组results
    // 经过处理后,构成按序排列原元素值的数组,最终传给callback回调
    async.select =
    async.filter = doParallel(_filter);

    async.selectLimit =
    async.filterLimit = doParallelLimit(_filter);

    async.selectSeries =
    async.filterSeries = doSeries(_filter);

 

  •  官方示例:
async.filter(['file1','file2','file3'], function(filePath, callback) {
    fs.access(filePath, function(err) {
        callback(null, !err)
    });
}, function(err, results) {
    // results now equals an array of the existing files 以数组形式返回存在的文件
});

 

 async.reject  |  rejectSeries   |  rejectLimit(arr, iterator, callback)

  •  实现功能:与async.filter的不同之处是,async.filter方法凭借iterator的回调参数v为真值保留arr原始项,async.reject凭借v为否时保留arr原始项。
  • 源码解读:
function _reject(eachfn, arr, iterator, callback) {
        _filter(eachfn, arr, function(value, cb) {
            iterator(value, function(v) {
                cb(!v);
            });
        }, callback);
    }

    // 与async.filter的不同是,reject方法的过滤条件是,iterator的回调传参v为否值
    async.reject = doParallel(_reject);
    async.rejectLimit = doParallelLimit(_reject);
    async.rejectSeries = doSeries(_reject);

 

  • 官方示例:
async.reject(['file1','file2','file3'], function(filePath, callback) {
    fs.access(filePath, function(err) {
        callback(null, !err)
    });
}, function(err, results) {
    // results now equals an array of missing files 为不存在的文件进行后续生成文件操作
    createFiles(results);
});

 

  async.any  |  some(arr, iterator, callback)

  •  实现功能:async.any|some方法调用ansyc.eachOf方法,当arr中有一项符合条件时,回调函数获得参数即为true。回调触发时机为遍历的每个阶段arr有一项符合条件时,或者在遍历执行完成时,当遍历执行完成时触发回调函数,回调函数不携带参数。
  • 源码解读:遍历的每个阶段有符合条件项时都能触发回调的原由是,以getResult(true, x)替换eachOf方法中的err,err为真,也即触发回调。不符合条件时,err为否,也即不触发回调,除非遍历过程执行完毕。中断遍历的实现是,arr中有一项符合条件,将cb改写为false,调用callback()回调,因为没有携带参数,eachOf方法相关代码也便不执行了。
// check、getResult均为传递函数,一个转化为布尔值,一个原样输出
    function _createTester(eachfn, check, getResult) {
        return function(arr, limit, iterator, cb) {
            function done() {
                if (cb) cb(getResult(false, void 0));
            }
            function iteratee(x, _, callback) {
                if (!cb) return callback();
                // 参数function(v){}由内部函数传给外部,作为iterator的回调,执行done回调
                iterator(x, function (v) {// v由外部函数传入,x是arr的元素项
                    if (cb && check(v)) {
                        // v为真、cb存在时执行cb回调,cb和iterator都赋值为false
                        // 因为iteratee中首先判断cb是否为否值,后续遍历直接进入callback回调
                        // 进入callback回调,callback在遍历执行完成时才有意义
                        cb(getResult(true, x));
                        cb = iterator = false;
                    }
                    callback();
                });
            }
            if (arguments.length > 3) {
                eachfn(arr, limit, iteratee, done);
            } else {
                cb = iterator;
                iterator = limit;
                eachfn(arr, iteratee, done);
            }
        };
    }

    // 调用async.eachOf方法,arr参数中有一项满足条件时,回调获得结果为真
    // 回调时机为遍历的每个阶段执行完毕
    async.any =
    async.some = _createTester(async.eachOf, toBool, identity);

 

  • 官方示例:原示例callback首参为null,将无机会触发回调。
async.some(['file1','file2','file3'], function(filePath, callback) {
    fs.access(filePath, function(err) {
        callback(!err)
    });
}, function(result) {
    // 有一项满足条件时,result为true
    // if result is true then at least one of the files exists
});

 

  async.someLimit(arr, limit, iterator, callback)

  •  实现功能:async.someLimit方法同ansyc.some方法,只是对同时触发的iterator个数作了限制。
  • 源码解读:
async.someLimit = _createTester(async.eachOfLimit, toBool, identity);

 

 async.all  |  every(arr, iterator, callback)

  •  实现功能:async.all|every方法调用ansyc.eachOf方法,当arr中每一项符合条件时,回调函数获得参数即为true。回调触发时机为在遍历执行完成时。当遍历每个阶段执行完成时,回调函数不携带参数,因而也就不触发回调。
  • 源码解读:有一项不满足条件中断回调的原由是,cb传入否值,以及callback参数为空,回调不得执行。最末一次时,cb赋值为空,done函数不得执行,回调也不能顺利执行。
// check、getResult均为传递函数,取反
    function _createTester(eachfn, check, getResult) {
        return function(arr, limit, iterator, cb) {
            function done() {
                // cb回调为真值,遍历完成后才触发执行
                if (cb) cb(getResult(false, void 0));
            }
            function iteratee(x, _, callback) {
                if (!cb) return callback();
                // 参数function(v){}由内部函数传给外部,作为iterator的回调,执行done回调
                iterator(x, function (v) {// v由外部函数传入,x是arr的元素项
                    // v传入真值,check永远为否,cb回调不执行,callback只在遍历完成时执行
                    // v为否值,cb回调参数为否值,回调不执行
                    if (cb && check(v)) {
                        cb(getResult(true, x));
                        cb = iterator = false;
                    }
                    callback();
                });
            }
            if (arguments.length > 3) {
                eachfn(arr, limit, iteratee, done);
            } else {
                cb = iterator;
                iterator = limit;
                eachfn(arr, iteratee, done);
            }
        };
    }

    // 调用async.eachOf方法,arr参数中每一项满足条件时,回调获得结果为真
    // 回调时机为遍历执行完成时
    async.all =
    async.every = _createTester(async.eachOf, notId, notId);

 

  • 官方示例:原示例callback首参为null,将无机会触发回调。
async.some(['file1','file2','file3'], function(filePath, callback) {
    fs.access(filePath, function(err) {
        callback(!err)
    });
}, function(result) {
    // 每一项满足条件时,result为true
});

 

  async.everyLimit(arr, limit, iterator, callback)

  •  实现功能:async.everyLimit方法同ansyc.every方法,只是对同时触发的iterator个数作了限制。
  • 源码解读:
async.everyLimit = _createTester(async.eachOfLimit, notId, notId);

 

 async.detect | detectSeries (arr, iterator, callback)

 async.detectLimit (arr, iterator, callback)

  •  实现功能:async.detect方法同ansyc.some方法相同,当arr中有一项满足条件,触发回调,不同于some方法的是,some方法传递给回调函数的参数为v判断条件,detect传参为arr中第一个符合条件的元素项。detectSeries方法上一个延时函数完成后调用下一个延时函数;detectLimit方法对同时执行的迭代器iterator作了限制。
  • 源码解读:
function _findGetResult(v, x) {
        return x;
    }
    // 与some方法相同,调用了_createTester函数,差别是some传参为v判断条件,detect传参为arr的元素项
    async.detect = _createTester(async.eachOf, identity, _findGetResult);
    async.detectSeries = _createTester(async.eachOfSeries, identity, _findGetResult);
    async.detectLimit = _createTester(async.eachOfLimit, identity, _findGetResult);

 

  • 官方示例:
async.detect(['file1','file2','file3'], function(filePath, callback) {
    fs.access(filePath, function(err) {
        callback(!err)
    });
}, function(result) {
    // result now equals the first file in the list that exists
});

 

  async.sortBy (arr, iterator, callback)

  • 实现功能 :async.sortBy实现arr的排序,遍历执行完成后将重新排序的arr传给回调。
  • 源码解读:
// 根据用户设置的criteria排序
    async.sortBy = function (arr, iterator, callback) {
        async.map(arr, function (x, callback) {
            iterator(x, function (err, criteria) {
                if (err) {
                    callback(err);
                }
                else {
                    callback(null, {value: x, criteria: criteria});
                }
            });
        }, function (err, results) {
            if (err) {
                return callback(err);
            }
            else {
                callback(null, _map(results.sort(comparator), function (x) {
                    return x.value;
                }));
            }

        });

        function comparator(left, right) {
            var a = left.criteria, b = right.criteria;
            return a < b ? -1 : a > b ? 1 : 0;
        }
    };

 

  • 官方示例:
async.sortBy(['file1','file2','file3'], function(file, callback) {
    fs.stat(file, function(err, stats) {
        callback(err, stats.mtime);
    });
}, function(err, results,result) {
    // results is now the original array of files sorted by
    // modified date 根据修改时间排序的文件
});

 

async.sortBy([1,9,3,5], function(x, callback) {
    callback(null, x*-1);    //<- x*-1 instead of x, turns the order around
}, function(err,result) {
    // result callback
});

 

   async.concat  |  concatSeries(arr, iterator, callback)

  • 实现功能 :async.concat方法用于遍历arr执行iterator函数过程中没有报错的项
  • 源码解读:
// 拼接arr元素项传参给回调,前提是没有捕获到错误,捕获到错误则略过
    function _concat(eachfn, arr, fn, callback) {
        var result = [];
        eachfn(arr, function (x, index, cb) {
            fn(x, function (err, y) {
                result = result.concat(y || []);
                cb(err);
            });
        }, function (err) {
            callback(err, result);
        });
    }
    async.concat = doParallel(_concat);
    async.concatSeries = doSeries(_concat);

 

  • 官方示例
async.concat(['dir1','dir2','dir3'], fs.readdir, function(err, files) {
    // files is now a list of filenames that exist in the 3 directories
    // 对存在的目录名进行拼接
});

 

 

 

Controls Flow 流程管控

async.parallel  |  parallelLimit  |  series(tasks, callback)

  • 实现功能:async.parallel调用async.eachOf方法,遍历执行tasks中的任务函数,根据tasks的数据类型(数组或对象形式)构建最终值results,传给最终回调callback函数。async.parallelLimit调用async.eachOfLimit方法,async.series调用async.eachOfSeries方法,其他原理相同。tasks或者以键值对形式存储任务函数,或者以数组形式存储任务函数。results通过作用域实现在task任务函数和最终回调callback中传递。
  • 源码解读:
function _parallel(eachfn, tasks, callback) {
        callback = callback || noop;
        var results = _isArrayLike(tasks) ? [] : {};

        eachfn(tasks, function (task, key, callback) {
            task(_restParam(function (err, args) {
                if (args.length <= 1) {
                    args = args[0];
                }
                results[key] = args;// 根据tasks数据类型构建results,传给最终回调,results根据作用域传递
                callback(err);
            }));
        }, function (err) {
            callback(err, results);
        });
    }

    // 遍历执行tasks中任务函数,数组或对象形式构建results,传给回调函数callback
    async.parallel = function (tasks, callback) {
        _parallel(async.eachOf, tasks, callback);
    };

    async.parallelLimit = function(tasks, limit, callback) {
        _parallel(_eachOfLimit(limit), tasks, callback);
    };

    async.series = function(tasks, callback) {
        _parallel(async.eachOfSeries, tasks, callback);
    };

 

  • 官方示例:示例一为数组形式,数组二为对象形式。
async.parallel([
    function(callback) {
        setTimeout(function() {
            callback(null, 'one');
        }, 200);
    },
    function(callback) {
        setTimeout(function() {
            callback(null, 'two');
        }, 100);
    }
],
// optional callback
function(err, results) {
    // the results array will equal ['one','two'] even though
    // the second function had a shorter timeout.
});

 

// an example using an object instead of an array
async.parallel({
    one: function(callback) {
        setTimeout(function() {
            callback(null, 1);
        }, 200);
    },
    two: function(callback) {
        setTimeout(function() {
            callback(null, 2);
        }, 100);
    }
}, function(err, results) {
    // results is now equals to: {one: 1, two: 2}
});

 

async.applyEach  |  applyEachSeries(fns, args, callback)

  • 实现功能:async.applyEach方法调用async.eachOf遍历执行fns数组中的函数元素项,fns函数参数均相同,且为args、callback。async.applyEachSeries方法调用async.eachOfSeries,为阻塞式调用。
  • 源码解读:
 function _applyEach(eachfn) {
        return _restParam(function(fns, args) {
            var go = _restParam(function(args) {
                var that = this;
                var callback = args.pop();// 尾项为回调函数
                return eachfn(fns, function (fn, _, cb) {
                    fn.apply(that, args.concat([cb]));
                },
                callback);
            });
            if (args.length) {
                return go.apply(this, args);
            }
            else {
                return go;
            }
        });
    }

    // 调用async.eachOf遍历执行fns函数,fns函数参数均相同,且为args、callback
    async.applyEach = _applyEach(async.eachOf);
    async.applyEachSeries = _applyEach(async.eachOfSeries);

 

  • 官方示例:
async.applyEach([enableSearch, updateSchema], 'bucket', callback);

// partial application example:
async.each(
    buckets,
    async.applyEach([enableSearch, updateSchema]),
    callback
);

 

async.auto(tasks, concurrency, callback)

  •  实现功能:由tasks中处理函数获得结果results,该结果值传给最终处理函数callback作为参数,再作后续处理。特殊情况下,当tasks中任务函数需要在另一函数执行完毕后调用,async.auto会自动根据依赖关系,率先执行无依赖的任务函数,接着执行依赖函数已经执行的任务函数。当函数执行过程中报错时,最终回调callback捕获到错误err,而results由先前及本次执行函数的结果值构成。concurrency限制同时执行的任务个数。async.auto用来处理异步延时函数时,需要设置task[task.length - 1]尾项任务函数中callback回调的触发时机,进而调用依赖函数执行。
  • 源码解读:对依赖函数的处理,若tasks中任务函数无依赖时,执行task[task.length - 1](taskCallback, results),即tasks任务对象中每个键值下的尾项数组元素function(callback, results),该函数中,调用并执行taskCallback(err,result)函数,其一是更新results[k]=result输出给最终回调的参数值,其二是调用taskComplete遍历依赖函数队列listeners,加载其中依赖函数已经执行的任务函数。若tasks中任务函数有依赖时,且依赖函数尚未执行,将该任务函数存放到依赖函数队列listeners中。
// 自动以无依赖的函数到依赖执行完成的函数顺序执行函数
    // tasks以函数映射名为键,函数队列为值,尾参是处理results的回调函数
    async.auto = function (tasks, concurrency, callback) {
        if (typeof arguments[1] === 'function') {
            // concurrency is optional, shift the args.
            callback = concurrency;
            concurrency = null;
        }
        callback = _once(callback || noop);
        var keys = _keys(tasks);
        var remainingTasks = keys.length;// tasks对象共有多少键,即共多少任务

        // 没有任务,直接调用回调函数
        if (!remainingTasks) {
            return callback(null);
        }
        if (!concurrency) {
            concurrency = remainingTasks;// concurrency同时执行的任务数,默认为tasks对象有多少键
        }

        var results = {};
        var runningTasks = 0;// 跑的任务数,与concurrency比较以对同时跑的任务数作限制

        var hasError = false;

        // 需要依赖函数完成加载后才执行的函数,通过addListener方法添加到listeners队列
        // 每当一个函数加载完成,调用taskComplete执行依赖加载完成的函数项,调用removeListener更新listeners队列
        var listeners = [];
        function addListener(fn) {
            listeners.unshift(fn);
        }
        function removeListener(fn) {
            var idx = _indexOf(listeners, fn);
            if (idx >= 0) listeners.splice(idx, 1);
        }

        // remainingTasks-1,遍历listeners执行依赖加载完成的函数项
        function taskComplete() {
            remainingTasks--;
            _arrayEach(listeners.slice(0), function (fn) {
                fn();
            });
        }

        // tasks中任务函数已执行完毕的情况下,remainingTasks=0,执行最终回调函数callback
        addListener(function () {
            if (!remainingTasks) {
                callback(null, results);
            }
        });

        _arrayEach(keys, function (k) {
            if (hasError) return;
            var task = _isArray(tasks[k]) ? tasks[k]: [tasks[k]];

            // 因为startIndex为0,_restParam作用是将taskCallback赋值为function(err,args),处理results
            var taskCallback = _restParam(function(err, args) {
                runningTasks--;
                if (args.length <= 1) {
                    args = args[0];
                }
                if (err) {
                    var safeResults = {};
                    _forEachOf(results, function(val, rkey) {
                        safeResults[rkey] = val;
                    });
                    safeResults[k] = args;// 关于k值的驻留???
                    hasError = true;

                    // 报错直接调用最终回调函数
                    callback(err, safeResults);

                // 没有错误,执行taskComplete,遍历listeners执行依赖加载完成的函数项
                }else {
                    results[k] = args;// results由tasks的末尾项task[task.length - 1]参数带入
                    async.setImmediate(taskComplete);
                }
            });

            // 检查依赖的函数是否存在,依赖可以是数组形式,但不能依赖自身
            var requires = task.slice(0, task.length - 1);// 排除最后一项
            // prevent dead-locks
            var len = requires.length;
            var dep;
            while (len--) {
                if (!(dep = tasks[requires[len]])) {
                    throw new Error('Has nonexistent dependency in ' + requires.join(', '));
                }
                if (_isArray(dep) && _indexOf(dep, k) >= 0) {
                    throw new Error('Has cyclic dependencies');
                }
            }

            // 依赖函数x在results中,执行函数k不在results中,且同时跑的函数个数runningTasks小于限制
            function ready() {
                // _reduce遍历数组项对尾参memo作处理,迭代器函数iterator中首参为memo,次参为元素项
                return runningTasks < concurrency && _reduce(requires, function (a, x) {
                    return (a && results.hasOwnProperty(x));
                }, true) && !results.hasOwnProperty(k);
            }

            // task[task.length - 1]末尾项为函数,之前为依赖
            // taskCallback,通过调用taskComplete,遍历listeners执行依赖加载完成的函数项
            if (ready()) {
                runningTasks++;
                task[task.length - 1](taskCallback, results);
            }else {
                addListener(listener);
            }

            // 添加listeners函数队列的依赖函数,执行时runningTasks+1,删除listeners队列依赖
            function listener() {
                if (ready()) {
                    runningTasks++;
                    removeListener(listener);
                    task[task.length - 1](taskCallback, results);
                }
            }
        });
    };
 
  • 官方示例:tasks键值对任务函数末尾项参数为callback、results,官方示例写反
async.auto({
    // this function will just be passed a callback
    readData: async.apply(fs.readFile, 'data.txt', 'utf-8'),
    showData: ['readData', function(cb,results) {
        // results.readData is the file's contents
        // ...
    }]
}, callback);
 
async.auto({
    get_data: function(callback) {
        console.log('in get_data');
        // async code to get some data
        callback(null, 'data', 'converted to array');
    },
    make_folder: function(callback) {
        console.log('in make_folder');
        // async code to create a directory to store a file in
        // this is run at the same time as getting the data
        callback(null, 'folder');
    },
    write_file: ['get_data', 'make_folder', function(callback, results)) {
        console.log('in write_file', JSON.stringify(results));
        // once there is some data and the directory exists,
        // write the data to a file in the directory
        callback(null, 'filename');
    }],
    email_link: ['write_file', function(callback, results) {
        console.log('in email_link', JSON.stringify(results));
        // once the file is written let's email a link to it...
        // results.write_file contains the filename returned by write_file.
        callback(null, {'file':results.write_file, 'email':'user@example.com'});
    }]
}, function(err, results) {
    console.log('err = ', err);
    console.log('results = ', results);
});

 

 async.retry(times, task, callback)

  •  实现功能:task任务报错时以times设置的重置执行次数,反复调用task任务并执行,直到task任务成功时或执行到最末一个task任务时,调用最终回调函数callback。async.retry方法通过调用async.series方法实现,当times中设置了重复执行task任务的等待时间interval时,async.series方法的任务队列中添加setTimeout(fn,interval)延迟函数,fn中调用下一个任务队列函数seriesCallback(null)。成功时终止执行任务队列函数、报错时继续执行下一个任务队列函数的实现,利用async.series方法任务队列函数中内部回调函数seriesCallback传参为否时,调用下一个任务队列函数,为真时,调用最终回调。因此,当task执行成功时,seriesCallback传参为真,终止执行任务队列函数;当task任务函数执行失败时,seriesCallback传参为否,继续执行下一个任务队列函数。
  • 源码解读:
// 报错时重复尝试task任务函数,直到最末一个task任务或某个task任务执行成功时,跳向最终回调
    async.retry = function(times, task, callback) {
        var DEFAULT_TIMES = 5;
        var DEFAULT_INTERVAL = 0;

        var attempts = [];

        var opts = {
            times: DEFAULT_TIMES,// task执行次数
            interval: DEFAULT_INTERVAL// task执行完成间隙时间
        };

        function parseTimes(acc, t){
            if(typeof t === 'number'){
                acc.times = parseInt(t, 10) || DEFAULT_TIMES;
            } else if(typeof t === 'object'){
                acc.times = parseInt(t.times, 10) || DEFAULT_TIMES;
                acc.interval = parseInt(t.interval, 10) || DEFAULT_INTERVAL;
            } else {
                throw new Error('Unsupported argument type for \'times\': ' + typeof t);
            }
        }

        var length = arguments.length;
        if (length < 1 || length > 3) {
            throw new Error('Invalid arguments - must be either (task), (task, callback), (times, task) or (times, task, callback)');
        } else if (length <= 2 && typeof times === 'function') {
            callback = task;
            task = times;
        }
        if (typeof times !== 'function') {
            parseTimes(opts, times);
        }
        opts.callback = callback;
        opts.task = task;

        function wrappedTask(wrappedCallback, wrappedResults) {
            function retryAttempt(task, finalAttempt) {
                // seriesCallback为task任务执行完成后回调函数,通过async.series方法设置
                // seriesCallback首参等同async.series方法中执行函数的回调函数参数err,正常执行需为否
                // 为真跳到最终回调
                // seriesCallback在async.series方法的意义是调用下一个task或最终回调
                return function(seriesCallback) {
                    // task首参为函数,该函数中调用async.series方法seriesCallback,也就是设置回调时机
                    task(function(err, result){
                        // err为否,执行成功的前提下,跳到最终回调,结束task任务的反复执行
                        // err为真,忽略该错误,继续执行task任务
                        // 到最末一个task任务时,执行完毕即调用最终回调
                        seriesCallback(!err || finalAttempt, {err: err, result: result});
                    }, wrappedResults);
                };
            }

            // 两个task任务之间添加等待时间
            function retryInterval(interval){
                return function(seriesCallback){
                    setTimeout(function(){
                        seriesCallback(null);
                    }, interval);
                };
            }

            while (opts.times) {

                var finalAttempt = !(opts.times-=1);
                attempts.push(retryAttempt(opts.task, finalAttempt));
                // 两个task任务之间添加等待时间
                if(!finalAttempt && opts.interval > 0){
                    attempts.push(retryInterval(opts.interval));
                }
            }

            // function(done,data)作为async.series方法的最终回调,done为是否报错,data为传入数据
            async.series(attempts, function(done, data){
                // data为tasks处理获得的最终值,获取尾项,其余项为中间处理值
                data = data[data.length - 1];
                (wrappedCallback || opts.callback)(data.err, data.result);
            });
        }

        // If a callback is passed, run this as a controll flow
        // 没有callback参数项时,输出为函数,需要传递wrappedCallback与wrappedResults
        return opts.callback ? wrappedTask() : wrappedTask;
    };

 

  • 示例:
function(callback) {
        async.retry({times:5, interval:1000}, function(cb) {
            do_task();
            var some_err = '';
            var some_result = '';
            cb(some_err, some_result);// 启用下一个任务函数或最终回调
        }, function(err, result) {
            callback(err, result);
        });
    },

 

async.iterator( tasks )

  • 实现功能:async模块内部_keyIterator函数遍历数组或对象执行时机是可控。当遍历对象是数组时,执行一次,去除数组元素的首项,执行第二次,取出数组元素的第二项。async.itertor方法同该数组、对象遍历函数相似,以可控的方式遍历执行tasks函数队列。调用async.itertor方法返回tasks队列首函数的执行函数,又通过执行函数返回次函数的执行函数,构成链式调用。特别的,可以当前执行函数的next方法获取下一个任务函数的执行函数,在async.waterfall方法中有使用。内部_keyIterator函数通过闭包实现,async.itertor方法通过返回函数实现。
  • 源码解读:
// 以可控的方式遍历执行tasks函数队列
    // makeCallback构建执行函数,并作为返回值
    // 执行函数调用过程中,执行tasks的任务函数,并调用makeCallback函数构建下一个任务函数的执行函数作为返回值
    // 执行函数的next方法获取下一个任务函数的执行函数
    async.iterator = function (tasks) {
        function makeCallback(index) {
            function fn() {// 执行函数
                if (tasks.length) {
                    tasks[index].apply(null, arguments);
                }
                return fn.next();
            }
            fn.next = function () {
                return (index < tasks.length - 1) ? makeCallback(index + 1): null;
            };
            return fn;
        }
        return makeCallback(0);
    };

 

  • 示例:
var a=async.intertor([function(){
       console.log(1)
},function(){
       console.log(2)
}]);
var b=a();// 打印1
b(); // 打印2
a.next()() // 打印2

 

 async.waterfall(tasks, callback)

  • 实现功能:async.intertor(tasks)方法为顺序执行tasks任务函数,上一个任务函数和下一个任务函数的关联只是影响执行时机,对下一个任务函数的参数无影响。async.auto(tasks, concurrency, callback)方法可以设置下一组任务函数的参数,但是最终回调函数callback的参数由多个任务函数处理值拼接后形成。async.parallel | series方法,最终回调函数获得参数同auto方法,由历次任务函数结果值拼接成数组或对象构成。比较下,async.waterfall(tasks, callback)方法也可以设置下一个任务函数的参数,并且最终回调callback的参数也由最后一个任务函数传参形成,似水珠似滚落,修改后传给下一个回调;async.auto像滚雪球,越滚越大。
  • 源码解读:
// 顺序执行tasks任务函数,并为任务函数传递参数,报错或任务函数执行完毕调用callback
    async.waterfall = function (tasks, callback) {
        callback = _once(callback || noop);
        if (!_isArray(tasks)) {
            var err = new Error('First argument to waterfall must be an array of functions');
            return callback(err);
        }
        if (!tasks.length) {
            return callback();
        }
        function wrapIterator(iterator) {
            return _restParam(function (err, args) {
                if (err) {
                    callback.apply(null, [err].concat(args));
                }
                else {
                    var next = iterator.next();// async.iterator方法获取下一个执行函数
                    if (next) {
                        args.push(wrapIterator(next));
                    }
                    else {
                        args.push(callback);
                    }
                    // 调用ensureAsync避免堆栈溢出
                    // 执行iterator函数,函数尾参为下一个iterator函数或最终回调callback
                    // 同时为下一个iterator函数或最终回调callback传递参数
                    ensureAsync(iterator).apply(null, args);
                }
            });
        }
        wrapIterator(async.iterator(tasks))();
    };

 

  •  官方示例:
async.waterfall([
    function(callback) {
        callback(null, 'one', 'two');
    },
    function(arg1, arg2, callback) {
        // arg1 now equals 'one' and arg2 now equals 'two'
        callback(null, 'three');
    },
    function(arg1, callback) {
        // arg1 now equals 'three'
        callback(null, 'done');
    }
], function (err, result) {
    // result now equals 'done'
});

 

async.whilst ( test, intertor, callback)

async.doWhilst ( intertor, test, callback)

async.until ( test, intertor, callback)

async.doUntil ( intertor, test, callback)

  • 实现功能:根据test校验结果执行intertor,test返回为真,执行intertor,test为否,执行最终回调callback。async.whilst方法和async.doWhilst方法的差别是前者可能执行也可能不执行intertor,完成看test校验结果,后者intertor必然执行一遍,随后看test校验结果。test获得参数和intertor获得参数相同,由使用者提供。until方法与whilst方法不同之处是,until方法作取反校验。
  • 源码解读:
// test合格后,执行iterator,否则跳到最终回调callback,目的是处理变量
    async.whilst = function (test, iterator, callback) {
        callback = callback || noop;
        if (test()) {
            var next = _restParam(function(err, args) {// args由外部函数传入
                if (err) {
                    callback(err);
                } else if (test.apply(this, args)) {
                    iterator(next);// 调用下一个iterator
                } else {
                    callback.apply(null, [null].concat(args));// 执行完成后最终回调,参数为null、args
                }
            });
            iterator(next);// 第一个iterator
        } else {
            callback(null);
        }
    };

    // iterator必然执行一次,其他看test检验是否合格
    async.doWhilst = function (iterator, test, callback) {
        var calls = 0;
        return async.whilst(function() {
            return ++calls <= 1 || test.apply(this, arguments);
        }, iterator, callback);
    };

    // 取反校验
    async.until = function (test, iterator, callback) {
        return async.whilst(function() {
            return !test.apply(this, arguments);
        }, iterator, callback);
    };

    async.doUntil = function (iterator, test, callback) {
        return async.doWhilst(iterator, function() {
            return !test.apply(this, arguments);
        }, callback);
    };

 

  • 官方示例:
var count = 0;
async.whilst(
    function() { return count < 5; },
    function(callback) {
        count++;
        setTimeout(function() {
            callback(null, count);
        }, 1000);
    },
    function (err, n) {
        // 5 seconds have passed, n = 5
    }
);

 

async.during ( test, intertor, callback)

async.doDuring ( intertor, test, callback)

  • 实现功能:相比whilst方法,during方法支持异步校验,test通过传递check函数作为参数的方式,校验check函数的参数是否为真,check函数的执行时机由使用者把握,whilst方法中test函数执行时机固定。
  • 源码解读:
// 同whilst,不同的是whilst方法校验立即执行,during方法校验需要等待使用者向test回调函数中注入参数
    // 可以实现异步校验,test回调参数通过延时函数获得
    async.during = function (test, iterator, callback) {
        callback = callback || noop;

        var next = _restParam(function(err, args) {
            if (err) {
                callback(err);
            } else {
                args.push(check);// args可以在同步处理或异步处理后使用
                test.apply(this, args);// args尾参中传入check,使用者调用执行
            }
        });

        var check = function(err, truth) {
            if (err) {
                callback(err);
            } else if (truth) {
                iterator(next);
            } else {
                callback(null);
            }
        };

        test(check);
    };

    async.doDuring = function (iterator, test, callback) {
        var calls = 0;
        async.during(function(next) {
            if (calls++ < 1) {
                next(null, true);
            } else {
                test.apply(this, arguments);
            }
        }, iterator, callback);
    };

 

  • 官方示例:
var count = 0;

async.during(
    function (callback) {
        // 可设置异步函数,在异步函数中执行callback校验回调,实现异步校验
        return callback(null, count < 5);
    },
    function (callback) {
        count++;
        setTimeout(callback, 1000);
    },
    function (err) {
        // 5 seconds have passed
    }
);

 

async.queue(worker, concurrency)

async.cargo(worker, payload)

  • 实现功能:async.queue添加任务函数并执行,提供暂停和重启功能。通过构建queue对象实现,任务函数在woker函数执行过程中启动。queue方法中参数concurrency用以限制同步执行的任务函数个数,cargo方法中参数concurrency设为1,即一次执行一个任务单元;cargo方法中参数payload设置预加载的任务单元中包含几个任务函数,这几条任务函数和下一个任务单元为同步执行关系,queue方法中payload设置为1,即一个任务单元只有一条任务函数。
  • 源码解读:实现过程中queue.tasks缓存待执行的任务函数,push方法调用_insert注册任务函数时添加,process执行任务单元时减少,kill方法清空,idle方法判断任务函数是否执行完毕;workers正在执行任务单元个数,process方法自增1,process方法中设置的回调函数_next函数自减1,表示该任务单元执行完成;workersList正在执行的任务单元,process方法添加,_next函数清除。unshift、push方法添加任务函数更新queue.tasks任务函数队列,并调用process方法执行任务函数;process方法设置任务单元,任务单元中包含的函数,调用worker方法执行任务函数的回调,启动下一个任务函数的执行;pause方法终止执行;resume方法恢复执行。saturated、empty、drain方法通过改写实现其功能,saturated方法当任务单元执行数量达到限制时执行,empty方法当process方法校验任务函数队列queue.tasks为空时执行,drain方法当所有任务执行完成后调用。
// 加载或执行任务函数(回调函数),任务函数的目的是启动任务函数中的回调函数
    function _queue(worker, concurrency, payload) {
        if (concurrency == null) {
            concurrency = 1;
        }
        else if(concurrency === 0) {
            throw new Error('Concurrency must not be zero');
        }
        // q.tasks待执行的任务函数队列添加任务
        function _insert(q, data, pos, callback) {// pos头部插入任务队列q.tasks
            if (callback != null && typeof callback !== "function") {
                throw new Error("task callback must be a function");
            }
            q.started = true;
            if (!_isArray(data)) {
                data = [data];
            }
            if(data.length === 0 && q.idle()) {
                // call drain immediately if there are no tasks
                return async.setImmediate(function() {
                    q.drain();
                });
            }
            _arrayEach(data, function(task) {
                var item = {
                    data: task,
                    callback: callback || noop
                };

                if (pos) {
                    q.tasks.unshift(item);
                } else {
                    q.tasks.push(item);
                }

                if (q.tasks.length === q.concurrency) {
                    q.saturated();
                }
            });
            async.setImmediate(q.process);
        }
        // 一个任务执行完成后回调,执行任务数workers-1,更新执行任务队列workersList
        // 执行任务固有的回调函数task.callback,调用执行下一个任务
        function _next(q, tasks) {
            return function(){
                workers -= 1;

                var removed = false;
                var args = arguments;
                // 更新workersList执行任务队列
                _arrayEach(tasks, function (task) {
                    _arrayEach(workersList, function (worker, index) {
                        if (worker === task && !removed) {
                            workersList.splice(index, 1);
                            removed = true;
                        }
                    });

                    // 执行任务固有的回调函数
                    task.callback.apply(task, args);
                });
                if (q.tasks.length + workers === 0) {
                    q.drain();
                }
                q.process();
            };
        }

        var workers = 0;// 执行中的任务数量
        var workersList = [];// 执行中的任务
        var q = {
            tasks: [],// 任务函数队列
            concurrency: concurrency,// 同步执行的worker个数
            payload: payload,
            saturated: noop,// 空函数,改写后实现功能,同步执行达到限制时执行
            empty: noop,// 改写实现功能,任务为空时执行
            drain: noop,// 改写实现功能,所有任务都执行完成后调用
            started: false,
            paused: false,
            push: function (data, callback) {// 尾部插入
                _insert(q, data, false, callback);
            },
            kill: function () {// 清空q.tasks任务函数队列
                q.drain = noop;
                q.tasks = [];
            },
            unshift: function (data, callback) {// 头部插入
                _insert(q, data, true, callback);
            },
            process: function () {
                while(!q.paused && workers < q.concurrency && q.tasks.length){

                    // 从q.tasks中取出任务,并更新q.tasks需要执行的任务函数队列
                    // tasks一个任务单元,任务单元中的任务同步执行
                    var tasks = q.payload ?
                        q.tasks.splice(0, q.payload) :
                        q.tasks.splice(0, q.tasks.length);// 预加载的任务函数数量

                    var data = _map(tasks, function (task) {
                        return task.data;
                    });

                    if (q.tasks.length === 0) {
                        q.empty();
                    }
                    workers += 1;
                    workersList.push(tasks[0]);
                    var cb = only_once(_next(q, tasks));
                    worker(data, cb);// _queue函数参数worker执行过程中调用cb函数执行下一个任务函数
                }
            },
            length: function () {// 需要执行的任务
                return q.tasks.length;
            },
            running: function () {// 同步执行的任务数
                return workers;
            },
            workersList: function () {// 同步执行的任务
                return workersList;
            },
            idle: function() {// 所有任务都已执行完成
                return q.tasks.length + workers === 0;
            },
            pause: function () {// 中断process函数执行
                q.paused = true;
            },
            resume: function () {// 恢复执行
                if (q.paused === false) { return; }
                q.paused = false;
                var resumeCount = Math.min(q.concurrency, q.tasks.length);
                // Need to call q.process once per concurrent
                // worker to preserve full concurrency after pause
                for (var w = 1; w <= resumeCount; w++) {
                    async.setImmediate(q.process);
                }
            }
        };
        return q;
    }

    // 返回queue对象,听过该对象添加或执行任务函数,通过调用worker函数执行任务函数
    // concurrency限制同步执行的任务个数
    async.queue = function (worker, concurrency) {
        // push进queue对象的任务函数通过调用worker执行其功能
        var q = _queue(function (items, cb) {
            worker(items[0], cb);
        }, concurrency, 1);

        return q;
    };
    
    // 返回queue对象,听过该对象添加或执行任务函数,通过调用worker函数执行任务函数
    // payload单次执行的任务单元中包含几条任务函数,这几条任务函数和下一个任务单元同步执行
    async.cargo = function (worker, payload) {
        return _queue(worker, 1, payload);
    };

 

  • 官方示例:
// create a queue object with concurrency 2
var q = async.queue(function(task, callback) {
    console.log('hello ' + task.name);
    callback();
}, 2);

// assign a callback
q.drain = function() {
    console.log('all items have been processed');
};

// add some items to the queue 添加任务函数并执行,任务函数在queue方法首参callback中执行
q.push({name: 'foo'}, function(err) {
    // 先打印task.name 即foo,后打印finished processing foo
    console.log('finished processing foo');
});
q.push({name: 'bar'}, function (err) {
    console.log('finished processing bar');
});

// add some items to the queue (batch-wise)
q.push([{name: 'baz'},{name: 'bay'},{name: 'bax'}], function(err) {
    console.log('finished processing item');
});

// add some items to the front of the queue
q.unshift({name: 'bar'}, function (err) {
    console.log('finished processing bar');
});

 

// create a cargo object with payload 2
var cargo = async.cargo(function(tasks, callback) {
    for (var i=0; i<tasks.length; i++) {
        console.log('hello ' + tasks[i].name);
    }
    callback();
}, 2);

// add some items
cargo.push({name: 'foo'}, function(err) {
    console.log('finished processing foo');
});
cargo.push({name: 'bar'}, function(err) {
    console.log('finished processing bar');
});
cargo.push({name: 'baz'}, function(err) {
    console.log('finished processing baz');
});

 

async.priorityQueue(worker, concurrency)

  • 实现功能:priorityQueue方法同queue方法基本相同,同时可以设置任务函数执行的优先级。priorityQueue方法内部调用queue方法,同时删除queue.unshift方法,改写queue.push方法。
  • 源码解读:通过中值法以及移位运算符将优先级较低的任务函数插到queue.tasks合适的位置值得留意
 // 内部调用async.queue方法返回queue对象,只是删除了unshift方法、改写了push方法
    // 设置任务函数执行的优先级
    async.priorityQueue = function (worker, concurrency) {

        // 比较优先级,越低越高优先级
        function _compareTasks(a, b){
            return a.priority - b.priority;
        }

        // 通过不断和拆分数组的中值作比较,得出item应该插到sequence数组中哪个位置
        function _binarySearch(sequence, item, compare) {
            var beg = -1,
                end = sequence.length - 1;
            while (beg < end) {
                var mid = beg + ((end - beg + 1) >>> 1);// 无符号右移,即除2,忽略余数
                if (compare(item, sequence[mid]) >= 0) {
                    beg = mid;
                } else {
                    end = mid - 1;
                }
            }
            return beg;
        }

        function _insert(q, data, priority, callback) {
            if (callback != null && typeof callback !== "function") {
                throw new Error("task callback must be a function");
            }
            q.started = true;
            if (!_isArray(data)) {
                data = [data];
            }
            if(data.length === 0) {
                // call drain immediately if there are no tasks
                return async.setImmediate(function() {
                    q.drain();
                });
            }
            _arrayEach(data, function(task) {
                var item = {
                    data: task,
                    priority: priority,
                    callback: typeof callback === 'function' ? callback : noop
                };

                q.tasks.splice(_binarySearch(q.tasks, item, _compareTasks) + 1, 0, item);

                if (q.tasks.length === q.concurrency) {
                    q.saturated();
                }
                async.setImmediate(q.process);
            });
        }

        // Start with a normal queue
        var q = async.queue(worker, concurrency);

        // Override push to accept second parameter representing priority
        q.push = function (data, priority, callback) {
            _insert(q, data, priority, callback);
        };

        // Remove unshift function
        delete q.unshift;

        return q;
    };

  

  • 示例:
var q = async.priorityQueue(function(task, callback) {
    console.log('hello ' + task.name);
    callback();
}, 2);

q.drain = function() {
    console.log('all items have been processed');
};

q.push({name: 'foo'}, 1,function(err) {
    console.log('finished processing foo');
});
q.push({name: 'bar'},2, function (err) {
    console.log('finished processing bar');
});

q.push([{name: 'baz'},{name: 'bay'},{name: 'bax'}],3, function(err) {
    console.log('finished processing item');
});

 

async.times | timesSeries | timesLimit( times, intertor, callback)

  • 实现功能:times方法调用map方法,timesSeries调用mapSeries方法,timesLimit调用mapLimit方法,设置iterator执行次数,传参为index值,即index遍历。
  • 源码解读:
function _times(mapper) {
        return function (count, iterator, callback) {
            // _range(count)输出为0到count构成数组
            mapper(_range(count), iterator, callback);
        };
    }

    // 设置iterator的调用次数count
    async.times = _times(async.map);
    async.timesSeries = _times(async.mapSeries);
    async.timesLimit = function (count, limit, iterator, callback) {
        return async.mapLimit(_range(count), limit, iterator, callback);
    };

 

  • 官方示例:
// Pretend this is some complicated async factory
var createUser = function(id, callback) {
    callback(null, {
        id: 'user' + id
    });
};

// generate 5 users
async.times(5, function(n, next) {
    createUser(n, function(err, user) {
        next(err, user);
    });
}, function(err, users) {
    // we should now have 5 users
});

 

async.seq | compose(fns)

  • 实现功能:返回函数,函数参数传递memo,seq方法本质调用async.reduce方法实现,遍历fns函数并执行,各回调处理memo后,将memo传递给最终回调callback函数。compose反向遍历并执行fns函数。
  • 源码解读:
// async.reduce的简化方案,遍历执行函数,对传递数据作处理后传给各回调
    async.seq = function (/* functions... */) {
        var fns = arguments;
        return _restParam(function (args) {
            var that = this;

            var callback = args[args.length - 1];
            if (typeof callback == 'function') {
                args.pop();
            } else {
                callback = noop;
            }

            async.reduce(fns, args, function (newargs, fn, cb) {
                fn.apply(that, newargs.concat([_restParam(function (err, nextargs) {
                    cb(err, nextargs);
                })]));
            },
            function (err, results) {
                callback.apply(that, [err].concat(results));
            });
        });
    };

    async.compose = function (/* functions... */) {
        return async.seq.apply(null, Array.prototype.reverse.call(arguments));
    };

 

官方示例:

 

// Requires lodash (or underscore), express3 and dresende's orm2.
// Part of an app, that fetches cats of the logged user.
// This example uses `seq` function to avoid overnesting and error
// handling clutter.
app.get('/cats', function(request, response) {
    var User = request.models.User;
    async.seq(
        _.bind(User.get, User),  // 'User.get' has signature (id, callback(err, data))
        function(user, fn) {
            user.getCats(fn);      // 'getCats' has signature (callback(err, data))
        }
    )(req.session.user_id, function (err, cats) {
        if (err) {
            console.error(err);
            response.json({ status: 'error', message: err.message });
        } else {
            response.json({ status: 'ok', message: 'Cats found', data: cats });
        }
    });
});

 

async.forever( fn, callback)

  • 实现功能:fn执行过程中,next回调执行调用fn自身,当报错时调用callback回调。
  • 源码解读:
// fn执行完毕,使用next函数,或者调用自身,或者报错时执行回调
    async.forever = function (fn, callback) {
        var done = only_once(callback || noop);
        var task = ensureAsync(fn);
        function next(err) {
            if (err) {
                return done(err);
            }
            task(next);
        }
        next();
    };

 

  • 官方示例:
async.forever(
    function(next) {
        // next is suitable for passing to things that need a callback(err [, whatever]);
        // it will result in this function being called again.
    },
    function(err) {
        // if next is called with a value in its first parameter, it will appear
        // in here as 'err', and execution will stop.
    }
);

 

 

 

Utils 工具方法

async.ensureAsync(fn)

  • 实现功能:根据fn(args,callback)是否延时函数,包装callback函数后传给fn。实现过程,ensureAsync方法接收fn的参数,改写后,再传递给fn。改写callback的目的是,当fn为同步函数时,使用async.setImmediate方法立即执行(该方法执行完成后也许立即回收堆栈),可以避免内存的浪费。
  • 源码解读:
function ensureAsync(fn) {
        // 输入参数最后一项是回调函数,sync为真同步执行状态,为否异步执行状态
        // 改写回调项,async.setImmediate跟普通调用callback的区别是没有堆栈溢出?
        return _restParam(function (args) {
            var callback = args.pop();
            args.push(function () {
                var innerArgs = arguments;
                if (sync) {
                    async.setImmediate(function () {
                        callback.apply(null, innerArgs);
                    });
                } else {
                    callback.apply(null, innerArgs);
                }
            });
            var sync = true;
            fn.apply(this, args);
            sync = false;
        });
    }

    // 改写参数函数fn中参数的回调函数callback设置,callback启动时机依然需使用者触发
    // 改写callback的目的避免同步函数的内存浪费
    async.ensureAsync = ensureAsync;

 

  • 官方示例:
function sometimesAsync(arg, callback) {
    if (cache[arg]) {
        return callback(null, cache[arg]); // this would be synchronous!!
    } else {
        doSomeIO(arg, callback); // this IO would be asynchronous
    }
}

// this has a risk of stack overflows if many results are cached in a row
// 堆栈溢出
async.mapSeries(args, sometimesAsync, done);

// this will defer sometimesAsync's callback if necessary,
// preventing stack overflows
// 避免堆栈溢出
async.mapSeries(args, async.ensureAsync(sometimesAsync), done);

 

async.ensureAsync(fn)

  • 实现功能:重设fn的参数。通过改造函数ensureAsync方法向fn中传递参数,返回fn的包装执行函数,该包装执行函数的参数也传给fn作为参数。
  • 源码解读:
// 返回fn的执行函数,目的是重设fn的参数
    async.apply = _restParam(function (fn, args) {
        return _restParam(function (callArgs) {
            return fn.apply(
                null, args.concat(callArgs)
            );
        });
    });

 

  • 官方示例:
node> var fn = async.apply(sys.puts, 'one');
node> fn('two', 'three');
one
two
three// sys.puts获得参数"one","two","three"

 

async.log | dir(fn,args)

  • 实现功能:在异步函数执行完成后打印结果,dir方法调用console.dir以dom视图打印。实现思路是,调用_resetParam重设fn函数的参数,最后一项为回调函数callback,其余项为log、dir方法的args,callback在内部封装后输出给外部,外部设置该回调的执行时机以及参数。
  • 源码解读:
function _console_fn(name) {
        return _restParam(function (fn, args) {
            // 通过内部函数向外部函数传递回调函数function(err,args)作为参数
            // 外部函数中设置回调函数的执行时机
            // 实际意义似乎和在异步函数中直接调用console.log()没差别
            fn.apply(null, args.concat([_restParam(function (err, args) {
                if (typeof console === 'object') {
                    if (err) {
                        if (console.error) {
                            console.error(err);
                        }
                    }
                    else if (console[name]) {
                        _arrayEach(args, function (x) {
                            console[name](x);
                        });
                    }
                }
            })]));
        });
    }
    async.log = _console_fn('log');
    async.dir = _console_fn('dir');// 显示dom试图
    /*async.info = _console_fn('info');
    async.warn = _console_fn('warn');
    async.error = _console_fn('error');*/
 
  • 官方示例:
// in a module
var hello = function(name, callback) {
    setTimeout(function() {
        callback(null, 'hello ' + name);
    }, 1000);
};

// in the node repl
node> async.log(hello, 'world');
'hello world'
 
// in a module
var hello = function(name, callback) {
    setTimeout(function() {
        callback(null, {hello: name});
    }, 1000);
};

// in the node repl
node> async.dir(hello, 'world');
{hello: 'world'}

 

async.memoize(fn,hasher) 

async.unmemoize(fn)

  • 实现功能:若fn已执行,且传参相同,重新调用fn的回调函数,若fn执行过程中,且fn传参相同,添加fn的回调函数。实现思路是使用queues记录fn参数下所用回调函数,执行完成时在回调函数中清空queues,当fn执行过程中,且传参相同,queues相应键值下就有值,可以做比较;在fn执行完成的回调中,使用memo记录fn参数下回调函数使用的参数,再次用同样参数执行fn时,memo相应键值下为真;初次调用作普通函数处理。memoize方法返回memoized函数。unmemoize方法将memoized函数作为普通函数处理。
  • 源码解读:
// 传参相同,fn已执行的状态下,重复调用回调函数,或fn执行中,添加回调函数
    async.memoize = function (fn, hasher) {
        var memo = {};
        var queues = {};
        var has = Object.prototype.hasOwnProperty;
        hasher = hasher || identity;
        var memoized = _restParam(function memoized(args) {
            var callback = args.pop();
            var key = hasher.apply(null, args);
            if (has.call(memo, key)) {   
                // 已执行,立即执行fn的回调
                async.setImmediate(function () {
                    callback.apply(null, memo[key]);
                });
            }
            else if (has.call(queues, key)) {
                // fn执行过程中,参数args相同,添加回调callback
                queues[key].push(callback);
            }
            else {
                queues[key] = [callback];// 执行前fn回调添加到queues队列
                fn.apply(null, args.concat([_restParam(function (args) {
                    memo[key] = args;// fn执行完成更新memo记录已执行,对象的键可以是数组、对象等数据类型
                    var q = queues[key];
                    delete queues[key];// 执行完清空该fn回调队列queues[key]
                    for (var i = 0, l = q.length; i < l; i++) {
                        q[i].apply(null, args);// 执行fn的回调
                    }
                })]));
            }
        });
        memoized.memo = memo;
        memoized.unmemoized = fn;// 没有memo记录的直接调用
        return memoized;
    };

    // 对async.memoize返回对象或普通函数fn作处理,视为普通函数处理
    async.unmemoize = function (fn) {
        return function () {
            return (fn.unmemoized || fn).apply(null, arguments);
        };
    };

 

  • 官方示例:
var slow_fn = function(name, callback) {
    // do something
    callback(null, result);
};
var fn = async.memoize(slow_fn);

// fn can now be used as if it were slow_fn   初次使用fn
fn('some name', function() {
    // callback
    console.log(1)
});

// 添加fn回调
fn('some name', function() {
    // callback
    console.log(2)
});

 

async.constant(value) 

  • 实现功能:使回调函数的首参err值为null,次参为value。
  • 源码解读:
// 重设callback首参err为null
    async.constant = _restParam(function(values) {
        var args = [null].concat(values);
        return function (callback) {
            return callback.apply(this, args);
        };
    });

 

  • 官方示例:
async.waterfall([
    async.constant(42),
    function (value, next) {
        // value === 42
    },
    //...
], callback);

async.waterfall([
    async.constant(filename, "utf8"),
    fs.readFile,
    function (fileData, next) {
        //...
    }
    //...
], callback);

async.auto({
    hostname: async.constant("https://server.net/"),
    port: findFreePort,
    launchServer: ["hostname", "port", function (options, cb) {
        startServer(options, cb);
    }],
    //...
}, callback);

 

async.wrapSync | asyncify(func) 

  • 实现功能:返回函数中传入func函数的参数(取出最末一个回调函数callback),执行func,若返回值不是带有then方法的thenable对象,直接执行回调函数callback;若返回值是thenable对象,执行该thenable对象的then方法。
  • 源码解读:
// 执行func,若返回值为thenable对象,再次执行该thenable对象的then方法,若不是,直接调用回调函数
    async.wrapSync =
    async.asyncify = function asyncify(func) {
        return _restParam(function (args) {
            var callback = args.pop();
            var result;
            try {
                result = func.apply(this, args);
            } catch (e) {
                return callback(e);
            }
            // if result is Promise object
            if (_isObject(result) && typeof result.then === "function") {
                result.then(function(value) {
                    callback(null, value);
                })["catch"](function(err) {
                    callback(err.message ? err : new Error(err));
                });
            } else {
                callback(null, result);
            }
        });
    };

 

  • 官方示例:
// passing a regular synchronous function
async.waterfall([
    async.apply(fs.readFile, filename, "utf8"),
    async.asyncify(JSON.parse),
    function (data, next) {
        // data is the result of parsing the text.
        // If there was a parsing error, it would have been caught.
    }
], callback);

// passing a function returning a promise
async.waterfall([
    async.apply(fs.readFile, filename, "utf8"),
    async.asyncify(function (contents) {
        return db.model.create(contents);
    }),
    function (model, next) {
        // `model` is the instantiated model object.
        // If there was an error, this function would be skipped.
    }
], callback);

// es6 example
var q = async.queue(async.asyncify(async function(file) {
    var intermediateStep = await processFile(file);
    return await somePromise(intermediateStep)
}));

q.push(files);

 

 

整体代码

/*!
 * async
 * https://github.com/caolan/async
 *
 * Copyright 2010-2014 Caolan McMahon
 * Released under the MIT license
 */
(function () {

    var async = {};
    function noop() {}
    function identity(v) {
        return v;
    }
    function toBool(v) {
        return !!v;
    }
    function notId(v) {
        return !v;
    }

    // global on the server, window in the browser
    var previous_async;

    // Establish the root object, `window` (`self`) in the browser, `global`
    // on the server, or `this` in some virtual machines. We use `self`
    // instead of `window` for `WebWorker` support.
    var root = typeof self === 'object' && self.self === self && self ||
            typeof global === 'object' && global.global === global && global ||
            this;

    if (root != null) {
        previous_async = root.async;
    }

    async.noConflict = function () {
        root.async = previous_async;
        return async;
    };

    // 包装执行函数,执行完成后清空
    function only_once(fn) {
        return function() {
            if (fn === null) throw new Error("Callback was already called.");
            fn.apply(this, arguments);
            fn = null;
        };
    }

    // 包装执行函数,执行完成后清空
    function _once(fn) {
        return function() {
            if (fn === null) return;
            fn.apply(this, arguments);
            fn = null;
        };
    }

    //// cross-browser compatiblity functions ////

    // 转化为字符串
    var _toString = Object.prototype.toString;

    // 是否数组
    var _isArray = Array.isArray || function (obj) {
        return _toString.call(obj) === '[object Array]';
    };

    // 是否对象
    var _isObject = function(obj) {
        var type = typeof obj;
        return type === 'function' || type === 'object' && !!obj;
    };

    // 是否数组或伪数组
    function _isArrayLike(arr) {
        return _isArray(arr) || (
            // has a positive integer length property
            typeof arr.length === "number" &&
            arr.length >= 0 &&
            arr.length % 1 === 0
        );
    }

    // 遍历数组执行回调
    function _arrayEach(arr, iterator) {
        var index = -1,
            length = arr.length;

        while (++index < length) {
            iterator(arr[index], index, arr);
        }
    }

    // 由回调函数重置数组的元素项
    function _map(arr, iterator) {
        var index = -1,
            length = arr.length,
            result = Array(length);

        while (++index < length) {
            result[index] = iterator(arr[index], index, arr);
        }
        return result;
    }

    // 由0到count构成数组
    function _range(count) {
        return _map(Array(count), function (v, i) { return i; });
    }

    // 遍历数组项用回调对memo作处理
    function _reduce(arr, iterator, memo) {
        _arrayEach(arr, function (x, i, a) {
            memo = iterator(memo, x, i, a);
        });
        return memo;
    }

    // 遍历对象的属性执行回调
    function _forEachOf(object, iterator) {
        _arrayEach(_keys(object), function (key) {
            iterator(object[key], key);
        });
    }

    // 获取元素的index,不存在是返回-1
    function _indexOf(arr, item) {
        for (var i = 0; i < arr.length; i++) {
            if (arr[i] === item) return i;
        }
        return -1;
    }

    // 获取对象的属性
    var _keys = Object.keys || function (obj) {
        var keys = [];
        for (var k in obj) {
            if (obj.hasOwnProperty(k)) {
                keys.push(k);
            }
        }
        return keys;
    };

    // 返回以数组元素项序号或对象主键作为返回值的函数,每执行一次通过闭包i+1,构成遍历
    function _keyIterator(coll) {
        var i = -1;
        var len;
        var keys;
        if (_isArrayLike(coll)) {
            len = coll.length;
            return function next() {
                i++;
                return i < len ? i : null;
            };
        } else {
            keys = _keys(coll);
            len = keys.length;
            return function next() {
                i++;
                return i < len ? keys[i] : null;
            };
        }
    }

    // 约定从返回函数的第几项开始获取参数,0、1有效,返回函数末参为index值
    // startIndex为0时,func传参为返回函数的arguments,默认为0
    // startIndex为1时,func传参为返回函数的arguments[0]、其余项
    function _restParam(func, startIndex) {
        startIndex = startIndex == null ? func.length - 1 : +startIndex;
        return function() {
            var length = Math.max(arguments.length - startIndex, 0);
            var rest = Array(length);
            for (var index = 0; index < length; index++) {
                rest[index] = arguments[index + startIndex];
            }
            switch (startIndex) {
                case 0: return func.call(this, rest);
                case 1: return func.call(this, arguments[0], rest);
            }
            // Currently unused but handle cases outside of the switch statement:
            // var args = Array(startIndex + 1);
            // for (index = 0; index < startIndex; index++) {
            //     args[index] = arguments[index];
            // }
            // args[startIndex] = rest;
            // return func.apply(this, args);
        };
    }

    // 没有index的数组项处理函数
    function _withoutIndex(iterator) {
        return function (value, index, callback) {
            return iterator(value, callback);
        };
    }

    //// exported async module functions ////

    //// nextTick implementation with browser-compatible fallback ////

    // capture the global reference to guard against fakeTimer mocks
    var _setImmediate = typeof setImmediate === 'function' && setImmediate;

    var _delay = _setImmediate ? function(fn) {
        // not a direct alias for IE10 compatibility
        _setImmediate(fn);
    } : function(fn) {
        setTimeout(fn, 0);
    };

    if (typeof process === 'object' && typeof process.nextTick === 'function') {
        async.nextTick = process.nextTick;
    } else {
        async.nextTick = _delay;
    }

    // 浏览器使用setTimeout、服务器端使用process.nextTick,在setImmediate函数不存在的前提下
    async.setImmediate = _setImmediate ? _delay : async.nextTick;

    // 实现功能同async.eachOf,只是iterator函数中没有数组的index值,或者没有对象属性集合的index值
    async.forEach =
    async.each = function (arr, iterator, callback) {
        return async.eachOf(arr, _withoutIndex(iterator), callback);
    };

    // 实现功能同async.eachOfSeries,只是iterator函数中没有数组的index值,或者没有对象属性集合的index值
    async.forEachSeries =
    async.eachSeries = function (arr, iterator, callback) {
        return async.eachOfSeries(arr, _withoutIndex(iterator), callback);
    };

    // 实现功能同async.eachOfLimit,只是iterator函数中没有数组的index值,或者没有对象属性集合的index值
    async.forEachLimit =
    async.eachLimit = function (arr, limit, iterator, callback) {
        return _eachOfLimit(limit)(arr, _withoutIndex(iterator), callback);
    };

    // 遍历object对象执行iterator,报错或遍历完成后调用callback(error)
    // callback(error)函数的触发时机需要手动在iterator中设置
    /*官方示例
    var obj = {dev: "/dev.json", test: "/test.json", prod: "/prod.json"};
    var configs = {};

    async.forEachOf(obj, function (value, key, callback) {
        fs.readFile(__dirname + value, "utf8", function (err, data) {
            if (err) return callback(err);
            try {
                configs[key] = JSON.parse(data);
            } catch (e) {
                return callback(e);
            }
            callback();
        });
    }, function (err) {
        if (err) console.error(err.message);
        // configs is now a map of JSON data
        doSomethingWith(configs);
    });*/
    async.forEachOf =
    async.eachOf = function (object, iterator, callback) {
        callback = _once(callback || noop);
        object = object || [];

        var iter = _keyIterator(object);
        var key, completed = 0;

        while ((key = iter()) != null) {// 反复执行闭包函数iter遍历对象
            completed += 1;
            // only_once(done)中调用外部函数callback,机理同Promise-resolve相似
            iterator(object[key], key, only_once(done));
        }

        if (completed === 0) callback(null);

        function done(err) {
            completed--;
            if (err) {
                callback(err);
            }
            // Check key is null in case iterator isn't exhausted
            // and done resolved synchronously.
            else if (key === null && completed <= 0) {
                callback(null);
            }
        }
    };

    // 本次iterator延迟执行完毕后,通过回调函数only_once(done)触发下一个iterator执行
    // _keyIterator函数的意义遍历的节奏变得可控,可终止,可触发,通常的each方法则不能
    async.forEachOfSeries =
    async.eachOfSeries = function (obj, iterator, callback) {
        callback = _once(callback || noop);
        obj = obj || [];
        var nextKey = _keyIterator(obj);
        var key = nextKey();
        function iterate() {
            var sync = true;
            if (key === null) {
                return callback(null);
            }
            iterator(obj[key], key, only_once(function (err) {
                if (err) {
                    callback(err);
                }
                else {
                    key = nextKey();
                    if (key === null) {
                        return callback(null);
                    } else {
                        if (sync) {
                            async.setImmediate(iterate);
                        } else {
                            iterate();
                        }
                    }
                }
            }));
            sync = false;
        }
        iterate();
    };


    // iterator同时执行个数限制为limit,延迟完成后加载下一个
    // limit=1时,eachOfLimit方法同async.eachOfSeries
    // limit=数组长度或对象属性集合的长度时,eachOfLimit方法同async.eachOf
    async.forEachOfLimit =
    async.eachOfLimit = function (obj, limit, iterator, callback) {
        _eachOfLimit(limit)(obj, iterator, callback);
    };

    function _eachOfLimit(limit) {

        return function (obj, iterator, callback) {
            callback = _once(callback || noop);
            obj = obj || [];
            var nextKey = _keyIterator(obj);
            if (limit <= 0) {
                return callback(null);
            }
            var done = false;
            var running = 0;// 记录iterator执行个数,iterator启动前+1,执行完成后-1
            var errored = false;

            (function replenish () {
                if (done && running <= 0) {
                    return callback(null);
                }

                while (running < limit && !errored) {
                    var key = nextKey();
                    if (key === null) {
                        done = true;
                        if (running <= 0) {
                            callback(null);
                        }
                        return;
                    }
                    running += 1;
                    iterator(obj[key], key, only_once(function (err) {
                        running -= 1;
                        if (err) {
                            callback(err);
                            errored = true;
                        }
                        else {
                            replenish();
                        }
                    }));
                }
            })();
        };
    }

    function doParallel(fn) {
        return function (obj, iterator, callback) {
            return fn(async.eachOf, obj, iterator, callback);
        };
    }
    function doParallelLimit(fn) {
        return function (obj, limit, iterator, callback) {
            return fn(_eachOfLimit(limit), obj, iterator, callback);
        };
    }
    function doSeries(fn) {
        return function (obj, iterator, callback) {
            return fn(async.eachOfSeries, obj, iterator, callback);
        };
    }

    function _asyncMap(eachfn, arr, iterator, callback) {
        callback = _once(callback || noop);
        arr = arr || [];
        var results = _isArrayLike(arr) ? [] : {};
        eachfn(arr, function (value, index, callback) {
            iterator(value, function (err, v) {
                results[index] = v;
                callback(err);
            });
        }, function (err) {
            callback(err, results);
        });
    }

    // 使用async.eachOf方法遍历obj执行iterator,主要目的是获取results,传给最终回调函数callback
    async.map = doParallel(_asyncMap);
    async.mapSeries = doSeries(_asyncMap);
    async.mapLimit = doParallelLimit(_asyncMap);

    // 调用eachOfSeries遍历arr,延迟完成后更新memo,最后将memo传给最终的回调函数callback
    async.inject =
    async.foldl =
    async.reduce = function (arr, memo, iterator, callback) {
        async.eachOfSeries(arr, function (x, i, callback) {
            iterator(memo, x, function (err, v) {
                memo = v;// 更新memo
                callback(err);// 错误处理
            });
        }, function (err) {
            callback(err, memo);//memo不是通过传值或者闭包驻留,由上级作用域赋予
        });
    };

    // 自右向左遍历
    async.foldr =
    async.reduceRight = function (arr, memo, iterator, callback) {
        var reversed = _map(arr, identity).reverse();
        async.reduce(reversed, memo, iterator, callback);
    };

    // 同async.reduce,主要功能是延迟后处理memo,再将memo交给callback回调,只是不阻塞延时函数执行
    async.transform = function (arr, memo, iterator, callback) {
        if (arguments.length === 3) {
            callback = iterator;
            iterator = memo;
            memo = _isArray(arr) ? [] : {};
        }

        async.eachOf(arr, function(v, k, cb) {
            iterator(memo, v, k, cb);
        }, function(err) {
            callback(err, memo);
        });
    };

    function _filter(eachfn, arr, iterator, callback) {
        var results = [];
        eachfn(arr, function (x, index, callback) {
            iterator(x, function (v) {
                if (v) {
                    results.push({index: index, value: x});
                }
                callback();
            });
        }, function () {
            callback(_map(results.sort(function (a, b) { // 由回调函数重置数组的元素项为results的value键值
                return a.index - b.index;
            }), function (x) {
                return x.value;
            }));
        });
    }

    // 调用async.eachOf方法遍历obj执行iterator,通过iterator的回调传参v过滤,返回数组results
    // 经过处理后,构成按序排列原元素值的数组,最终传给callback回调
    async.select =
    async.filter = doParallel(_filter);

    async.selectLimit =
    async.filterLimit = doParallelLimit(_filter);

    async.selectSeries =
    async.filterSeries = doSeries(_filter);

    function _reject(eachfn, arr, iterator, callback) {
        _filter(eachfn, arr, function(value, cb) {// cb在_filter函数中写就,cb为引用
            iterator(value, function(v) {
                cb(!v);
            });
        }, callback);
    }

    // 与async.filter的不同是,reject方法的过滤条件是,iterator的回调传参v为否值
    async.reject = doParallel(_reject);
    async.rejectLimit = doParallelLimit(_reject);
    async.rejectSeries = doSeries(_reject);

    // 以async.some方法解读
    // check、getResult均为传递函数,一个转化为布尔值,一个原样输出
    function _createTester(eachfn, check, getResult) {
        return function(arr, limit, iterator, cb) {
            function done() {
                if (cb) cb(getResult(false, void 0));
            }
            function iteratee(x, _, callback) {
                if (!cb) return callback();
                // 参数function(v){}由内部函数传给外部,作为iterator的回调,执行done回调
                iterator(x, function (v) {// v由外部函数传入,x是arr的元素项
                    if (cb && check(v)) {
                        // v为真、cb存在时执行cb回调,cb和iterator都赋值为false
                        // 因为iteratee中首先判断cb是否为否值,后续遍历直接进入callback回调
                        // 进入callback回调,callback在遍历执行完成时才有意义
                        cb(getResult(true, x));
                        cb = iterator = false;
                    }
                    callback();
                });
            }
            if (arguments.length > 3) {
                eachfn(arr, limit, iteratee, done);
            } else {
                cb = iterator;
                iterator = limit;
                eachfn(arr, iteratee, done);
            }
        };
    }

    // 调用async.eachOf方法,arr参数中有一项满足条件时,回调获得结果为真
    // 回调时机为遍历的每个阶段执行完毕
    async.any =
    async.some = _createTester(async.eachOf, toBool, identity);

    async.someLimit = _createTester(async.eachOfLimit, toBool, identity);

    // 调用async.eachOf方法,arr参数中每一项满足条件时,回调获得结果为真
    // 回调时机为遍历执行完成时
    async.all =
    async.every = _createTester(async.eachOf, notId, notId);

    async.everyLimit = _createTester(async.eachOfLimit, notId, notId);

    function _findGetResult(v, x) {
        return x;
    }
    // 与some方法相同,调用了_createTester函数,差别是some传参为v判断条件,detect传参为arr的元素项
    async.detect = _createTester(async.eachOf, identity, _findGetResult);
    async.detectSeries = _createTester(async.eachOfSeries, identity, _findGetResult);
    async.detectLimit = _createTester(async.eachOfLimit, identity, _findGetResult);

    // 根据用户设置的criteria排序
    async.sortBy = function (arr, iterator, callback) {
        async.map(arr, function (x, callback) {
            iterator(x, function (err, criteria) {
                if (err) {
                    callback(err);
                }
                else {
                    callback(null, {value: x, criteria: criteria});
                }
            });
        }, function (err, results) {
            if (err) {
                return callback(err);
            }
            else {
                callback(null, _map(results.sort(comparator), function (x) {
                    return x.value;
                }));
            }

        });

        function comparator(left, right) {
            var a = left.criteria, b = right.criteria;
            return a < b ? -1 : a > b ? 1 : 0;
        }
    };

    /*---------------------------------------------------------------------------------*/


    // 自动以无依赖的函数到依赖执行完成的函数顺序执行函数
    // tasks以函数映射名为键,函数队列为值,尾参是处理results的回调函数
    async.auto = function (tasks, concurrency, callback) {
        if (typeof arguments[1] === 'function') {
            // concurrency is optional, shift the args.
            callback = concurrency;
            concurrency = null;
        }
        callback = _once(callback || noop);
        var keys = _keys(tasks);
        var remainingTasks = keys.length;// tasks对象共有多少键,即共多少任务

        // 没有任务,直接调用回调函数
        if (!remainingTasks) {
            return callback(null);
        }
        if (!concurrency) {
            concurrency = remainingTasks;// concurrency同时执行的任务数,默认为tasks对象有多少键
        }

        var results = {};
        var runningTasks = 0;// 跑的任务数,与concurrency比较以对同时跑的任务数作限制

        var hasError = false;

        // 需要依赖函数完成加载后才执行的函数,通过addListener方法添加到listeners队列
        // 每当一个函数加载完成,调用taskComplete执行依赖加载完成的函数项,调用removeListener更新listeners队列
        var listeners = [];
        function addListener(fn) {
            listeners.unshift(fn);
        }
        function removeListener(fn) {
            var idx = _indexOf(listeners, fn);
            if (idx >= 0) listeners.splice(idx, 1);
        }

        // remainingTasks-1,遍历listeners执行依赖加载完成的函数项
        function taskComplete() {
            remainingTasks--;
            _arrayEach(listeners.slice(0), function (fn) {
                fn();
            });
        }

        // tasks中任务函数已执行完毕的情况下,remainingTasks=0,执行最终回调函数callback
        addListener(function () {
            if (!remainingTasks) {
                callback(null, results);
            }
        });

        _arrayEach(keys, function (k) {
            if (hasError) return;
            var task = _isArray(tasks[k]) ? tasks[k]: [tasks[k]];

            // 因为startIndex为0,_restParam作用是将taskCallback赋值为function(err,args),处理results
            var taskCallback = _restParam(function(err, args) {
                runningTasks--;
                if (args.length <= 1) {
                    args = args[0];
                }
                if (err) {
                    var safeResults = {};
                    _forEachOf(results, function(val, rkey) {
                        safeResults[rkey] = val;
                    });
                    safeResults[k] = args;// 关于k值的驻留???
                    hasError = true;

                    // 报错直接调用最终回调函数
                    callback(err, safeResults);

                // 没有错误,执行taskComplete,遍历listeners执行依赖加载完成的函数项
                }else {
                    results[k] = args;// results由tasks的末尾项task[task.length - 1]参数带入
                    async.setImmediate(taskComplete);
                }
            });

            // 检查依赖的函数是否存在,依赖可以是数组形式,但不能依赖自身
            var requires = task.slice(0, task.length - 1);// 排除最后一项
            // prevent dead-locks
            var len = requires.length;
            var dep;
            while (len--) {
                if (!(dep = tasks[requires[len]])) {
                    throw new Error('Has nonexistent dependency in ' + requires.join(', '));
                }
                if (_isArray(dep) && _indexOf(dep, k) >= 0) {
                    throw new Error('Has cyclic dependencies');
                }
            }

            // 依赖函数x在results中,执行函数k不在results中,且同时跑的函数个数runningTasks小于限制
            function ready() {
                // _reduce遍历数组项对尾参memo作处理,迭代器函数iterator中首参为memo,次参为元素项
                return runningTasks < concurrency && _reduce(requires, function (a, x) {
                    return (a && results.hasOwnProperty(x));
                }, true) && !results.hasOwnProperty(k);
            }

            // task[task.length - 1]末尾项为函数,之前为依赖
            // taskCallback,通过调用taskComplete,遍历listeners执行依赖加载完成的函数项
            if (ready()) {
                runningTasks++;
                task[task.length - 1](taskCallback, results);
            }else {
                addListener(listener);
            }

            // 添加listeners函数队列的依赖函数,执行时runningTasks+1,删除listeners队列依赖
            function listener() {
                if (ready()) {
                    runningTasks++;
                    removeListener(listener);
                    task[task.length - 1](taskCallback, results);
                }
            }
        });
    };

    // 报错时重复尝试task任务函数,直到最末一个task任务或某个task任务执行成功时,跳向最终回调
    async.retry = function(times, task, callback) {
        var DEFAULT_TIMES = 5;
        var DEFAULT_INTERVAL = 0;

        var attempts = [];

        var opts = {
            times: DEFAULT_TIMES,// task执行次数
            interval: DEFAULT_INTERVAL// task执行完成间隙时间
        };

        function parseTimes(acc, t){
            if(typeof t === 'number'){
                acc.times = parseInt(t, 10) || DEFAULT_TIMES;
            } else if(typeof t === 'object'){
                acc.times = parseInt(t.times, 10) || DEFAULT_TIMES;
                acc.interval = parseInt(t.interval, 10) || DEFAULT_INTERVAL;
            } else {
                throw new Error('Unsupported argument type for \'times\': ' + typeof t);
            }
        }

        var length = arguments.length;
        if (length < 1 || length > 3) {
            throw new Error('Invalid arguments - must be either (task), (task, callback), (times, task) or (times, task, callback)');
        } else if (length <= 2 && typeof times === 'function') {
            callback = task;
            task = times;
        }
        if (typeof times !== 'function') {
            parseTimes(opts, times);
        }
        opts.callback = callback;
        opts.task = task;

        function wrappedTask(wrappedCallback, wrappedResults) {
            function retryAttempt(task, finalAttempt) {
                // seriesCallback为task任务执行完成后回调函数,通过async.series方法设置
                // seriesCallback首参等同async.series方法中执行函数的回调函数参数err,正常执行需为否
                // 为真跳到最终回调
                // seriesCallback在async.series方法的意义是调用下一个task或最终回调
                return function(seriesCallback) {
                    // task首参为函数,该函数中调用async.series方法seriesCallback,也就是设置回调时机
                    task(function(err, result){
                        // err为否,执行成功的前提下,跳到最终回调,结束task任务的反复执行
                        // err为真,忽略该错误,继续执行task任务
                        // 到最末一个task任务时,执行完毕即调用最终回调
                        seriesCallback(!err || finalAttempt, {err: err, result: result});
                    }, wrappedResults);
                };
            }

            // 两个task任务之间添加等待时间
            function retryInterval(interval){
                return function(seriesCallback){
                    setTimeout(function(){
                        seriesCallback(null);
                    }, interval);
                };
            }

            while (opts.times) {

                var finalAttempt = !(opts.times-=1);
                attempts.push(retryAttempt(opts.task, finalAttempt));
                // 两个task任务之间添加等待时间
                if(!finalAttempt && opts.interval > 0){
                    attempts.push(retryInterval(opts.interval));
                }
            }

            // function(done,data)作为async.series方法的最终回调,done为是否报错,data为传入数据
            async.series(attempts, function(done, data){
                // data为tasks处理获得的最终值,获取尾项,其余项为中间处理值
                data = data[data.length - 1];
                (wrappedCallback || opts.callback)(data.err, data.result);
            });
        }

        // If a callback is passed, run this as a controll flow
        // 没有callback参数项时,输出为函数,需要传递wrappedCallback与wrappedResults
        return opts.callback ? wrappedTask() : wrappedTask;
    };

    // 顺序执行tasks任务函数,并为任务函数传递参数,报错或任务函数执行完毕调用callback
    async.waterfall = function (tasks, callback) {
        callback = _once(callback || noop);
        if (!_isArray(tasks)) {
            var err = new Error('First argument to waterfall must be an array of functions');
            return callback(err);
        }
        if (!tasks.length) {
            return callback();
        }
        function wrapIterator(iterator) {
            return _restParam(function (err, args) {
                if (err) {
                    callback.apply(null, [err].concat(args));
                }
                else {
                    var next = iterator.next();// async.iterator方法获取下一个执行函数
                    if (next) {
                        args.push(wrapIterator(next));
                    }
                    else {
                        args.push(callback);
                    }
                    // 调用ensureAsync避免堆栈溢出
                    // 执行iterator函数,函数尾参为下一个iterator函数或最终回调callback
                    // 同时为下一个iterator函数或最终回调callback传递参数
                    ensureAsync(iterator).apply(null, args);
                }
            });
        }
        wrapIterator(async.iterator(tasks))();
    };

    function _parallel(eachfn, tasks, callback) {
        callback = callback || noop;
        var results = _isArrayLike(tasks) ? [] : {};

        eachfn(tasks, function (task, key, callback) {
            task(_restParam(function (err, args) {
                if (args.length <= 1) {
                    args = args[0];
                }
                results[key] = args;// 根据tasks数据类型构建results,传给最终回调,results根据作用域传递
                callback(err);
            }));
        }, function (err) {
            callback(err, results);
        });
    }

    // 遍历执行tasks中任务函数,数组或对象形式构建results,传给回调函数callback
    async.parallel = function (tasks, callback) {
        _parallel(async.eachOf, tasks, callback);
    };

    async.parallelLimit = function(tasks, limit, callback) {
        _parallel(_eachOfLimit(limit), tasks, callback);
    };

    async.series = function(tasks, callback) {
        _parallel(async.eachOfSeries, tasks, callback);
    };

    // 以可控的方式遍历执行tasks函数队列
    // makeCallback构建执行函数,并作为返回值
    // 执行函数调用过程中,执行tasks的任务函数,并调用makeCallback函数构建下一个任务函数的执行函数作为返回值
    // 执行函数的next方法获取下一个任务函数的执行函数
    async.iterator = function (tasks) {
        function makeCallback(index) {
            function fn() {// 执行函数
                if (tasks.length) {
                    tasks[index].apply(null, arguments);
                }
                return fn.next();
            }
            fn.next = function () {
                return (index < tasks.length - 1) ? makeCallback(index + 1): null;
            };
            return fn;
        }
        return makeCallback(0);
    };

    // 返回fn的执行函数,目的是重设fn的参数
    async.apply = _restParam(function (fn, args) {
        return _restParam(function (callArgs) {
            return fn.apply(
                null, args.concat(callArgs)
            );
        });
    });

    // 拼接arr元素项传参给回调,前提是没有捕获到错误,捕获到错误则略过
    function _concat(eachfn, arr, fn, callback) {
        var result = [];
        eachfn(arr, function (x, index, cb) {
            fn(x, function (err, y) {
                result = result.concat(y || []);
                cb(err);
            });
        }, function (err) {
            callback(err, result);
        });
    }
    async.concat = doParallel(_concat);
    async.concatSeries = doSeries(_concat);

    // test合格后,执行iterator,否则跳到最终回调callback,目的是处理变量
    async.whilst = function (test, iterator, callback) {
        callback = callback || noop;
        if (test()) {
            var next = _restParam(function(err, args) {// args由外部函数传入
                if (err) {
                    callback(err);
                } else if (test.apply(this, args)) {
                    iterator(next);// 调用下一个iterator
                } else {
                    callback.apply(null, [null].concat(args));// 执行完成后最终回调,参数为null、args
                }
            });
            iterator(next);// 第一个iterator
        } else {
            callback(null);
        }
    };

    // iterator必然执行一次,其他看test检验是否合格
    async.doWhilst = function (iterator, test, callback) {
        var calls = 0;
        return async.whilst(function() {
            return ++calls <= 1 || test.apply(this, arguments);
        }, iterator, callback);
    };

    // 取反校验
    async.until = function (test, iterator, callback) {
        return async.whilst(function() {
            return !test.apply(this, arguments);
        }, iterator, callback);
    };

    async.doUntil = function (iterator, test, callback) {
        return async.doWhilst(iterator, function() {
            return !test.apply(this, arguments);
        }, callback);
    };

    // 同whilst,不同的是whilst方法校验立即执行,during方法校验需要等待使用者向test回调函数中注入参数
    // 可以实现异步校验,test回调参数通过延时函数获得
    async.during = function (test, iterator, callback) {
        callback = callback || noop;

        var next = _restParam(function(err, args) {
            if (err) {
                callback(err);
            } else {
                args.push(check);// args可以在同步处理或异步处理后使用
                test.apply(this, args);// args尾参中传入check,使用者调用执行
            }
        });

        var check = function(err, truth) {
            if (err) {
                callback(err);
            } else if (truth) {
                iterator(next);
            } else {
                callback(null);
            }
        };

        test(check);
    };

    async.doDuring = function (iterator, test, callback) {
        var calls = 0;
        async.during(function(next) {
            if (calls++ < 1) {
                next(null, true);
            } else {
                test.apply(this, arguments);
            }
        }, iterator, callback);
    };

    // 加载或执行任务函数(回调函数),任务函数的目的是启动任务函数中的回调函数
    function _queue(worker, concurrency, payload) {
        if (concurrency == null) {
            concurrency = 1;
        }
        else if(concurrency === 0) {
            throw new Error('Concurrency must not be zero');
        }
        // q.tasks待执行的任务函数队列添加任务
        function _insert(q, data, pos, callback) {// pos头部插入任务队列q.tasks
            if (callback != null && typeof callback !== "function") {
                throw new Error("task callback must be a function");
            }
            q.started = true;
            if (!_isArray(data)) {
                data = [data];
            }
            if(data.length === 0 && q.idle()) {
                // call drain immediately if there are no tasks
                return async.setImmediate(function() {
                    q.drain();
                });
            }
            _arrayEach(data, function(task) {
                var item = {
                    data: task,
                    callback: callback || noop
                };

                if (pos) {
                    q.tasks.unshift(item);
                } else {
                    q.tasks.push(item);
                }

                if (q.tasks.length === q.concurrency) {
                    q.saturated();
                }
            });
            async.setImmediate(q.process);
        }
        // 一个任务执行完成后回调,执行任务数workers-1,更新执行任务队列workersList
        // 执行任务固有的回调函数task.callback,调用执行下一个任务
        function _next(q, tasks) {
            return function(){
                workers -= 1;

                var removed = false;
                var args = arguments;
                // 更新workersList执行任务队列
                _arrayEach(tasks, function (task) {
                    _arrayEach(workersList, function (worker, index) {
                        if (worker === task && !removed) {
                            workersList.splice(index, 1);
                            removed = true;
                        }
                    });

                    // 执行任务固有的回调函数
                    task.callback.apply(task, args);
                });
                if (q.tasks.length + workers === 0) {
                    q.drain();
                }
                q.process();
            };
        }

        var workers = 0;// 执行中的任务数量
        var workersList = [];// 执行中的任务
        var q = {
            tasks: [],// 任务函数队列
            concurrency: concurrency,// 同步执行的worker个数
            payload: payload,
            saturated: noop,// 空函数,改写后实现功能,同步执行达到限制时执行
            empty: noop,// 改写实现功能,任务为空时执行
            drain: noop,// 改写实现功能,所有任务都执行完成后调用
            started: false,
            paused: false,
            push: function (data, callback) {// 尾部插入
                _insert(q, data, false, callback);
            },
            kill: function () {// 清空q.tasks任务函数队列
                q.drain = noop;
                q.tasks = [];
            },
            unshift: function (data, callback) {// 头部插入
                _insert(q, data, true, callback);
            },
            process: function () {
                while(!q.paused && workers < q.concurrency && q.tasks.length){

                    // 从q.tasks中取出任务,并更新q.tasks需要执行的任务函数队列
                    // tasks一个任务单元,任务单元中的任务同步执行
                    var tasks = q.payload ?
                        q.tasks.splice(0, q.payload) :
                        q.tasks.splice(0, q.tasks.length);// 预加载的任务函数数量

                    var data = _map(tasks, function (task) {
                        return task.data;
                    });

                    if (q.tasks.length === 0) {
                        q.empty();
                    }
                    workers += 1;
                    workersList.push(tasks[0]);
                    var cb = only_once(_next(q, tasks));
                    worker(data, cb);// _queue函数参数worker执行过程中调用cb函数执行下一个任务函数
                }
            },
            length: function () {// 需要执行的任务
                return q.tasks.length;
            },
            running: function () {// 同步执行的任务数
                return workers;
            },
            workersList: function () {// 同步执行的任务
                return workersList;
            },
            idle: function() {// 所有任务都已执行完成
                return q.tasks.length + workers === 0;
            },
            pause: function () {// 中断process函数执行
                q.paused = true;
            },
            resume: function () {// 恢复执行
                if (q.paused === false) { return; }
                q.paused = false;
                var resumeCount = Math.min(q.concurrency, q.tasks.length);
                // Need to call q.process once per concurrent
                // worker to preserve full concurrency after pause
                for (var w = 1; w <= resumeCount; w++) {
                    async.setImmediate(q.process);
                }
            }
        };
        return q;
    }

    // 返回queue对象,听过该对象添加或执行任务函数,通过调用worker函数执行任务函数
    // concurrency限制同步执行的任务个数
    async.queue = function (worker, concurrency) {
        // push进queue对象的任务函数通过调用worker执行其功能
        var q = _queue(function (items, cb) {
            worker(items[0], cb);
        }, concurrency, 1);

        return q;
    };

    // 内部调用async.queue方法返回queue对象,只是删除了unshift方法、改写了push方法
    // 设置任务函数执行的优先级
    async.priorityQueue = function (worker, concurrency) {

        // 比较优先级,越低越高优先级
        function _compareTasks(a, b){
            return a.priority - b.priority;
        }

        // 通过不断和拆分数组的中值作比较,得出item应该插到sequence数组中哪个位置
        function _binarySearch(sequence, item, compare) {
            var beg = -1,
                end = sequence.length - 1;
            while (beg < end) {
                var mid = beg + ((end - beg + 1) >>> 1);// 无符号右移,即除2,忽略余数
                if (compare(item, sequence[mid]) >= 0) {
                    beg = mid;
                } else {
                    end = mid - 1;
                }
            }
            return beg;
        }

        function _insert(q, data, priority, callback) {
            if (callback != null && typeof callback !== "function") {
                throw new Error("task callback must be a function");
            }
            q.started = true;
            if (!_isArray(data)) {
                data = [data];
            }
            if(data.length === 0) {
                // call drain immediately if there are no tasks
                return async.setImmediate(function() {
                    q.drain();
                });
            }
            _arrayEach(data, function(task) {
                var item = {
                    data: task,
                    priority: priority,
                    callback: typeof callback === 'function' ? callback : noop
                };

                q.tasks.splice(_binarySearch(q.tasks, item, _compareTasks) + 1, 0, item);

                if (q.tasks.length === q.concurrency) {
                    q.saturated();
                }
                async.setImmediate(q.process);
            });
        }

        // Start with a normal queue
        var q = async.queue(worker, concurrency);

        // Override push to accept second parameter representing priority
        q.push = function (data, priority, callback) {
            _insert(q, data, priority, callback);
        };

        // Remove unshift function
        delete q.unshift;

        return q;
    };

    // 返回queue对象,听过该对象添加或执行任务函数,通过调用worker函数执行任务函数
    // payload单次执行的任务单元中包含几条任务函数,这几条任务函数和下一个任务单元同步执行
    async.cargo = function (worker, payload) {
        return _queue(worker, 1, payload);
    };

    function _console_fn(name) {
        return _restParam(function (fn, args) {
            // 通过内部函数向外部函数传递回调函数function(err,args)作为参数
            // 外部函数中设置回调函数的执行时机
            // 实际意义似乎和在异步函数中直接调用console.log()没差别
            fn.apply(null, args.concat([_restParam(function (err, args) {
                if (typeof console === 'object') {
                    if (err) {
                        if (console.error) {
                            console.error(err);
                        }
                    }
                    else if (console[name]) {
                        _arrayEach(args, function (x) {
                            console[name](x);
                        });
                    }
                }
            })]));
        });
    }
    async.log = _console_fn('log');
    async.dir = _console_fn('dir');// 显示dom试图
    /*async.info = _console_fn('info');
    async.warn = _console_fn('warn');
    async.error = _console_fn('error');*/

    // 传参相同,fn已执行的状态下,重复调用回调函数,或fn执行中,添加回调函数
    async.memoize = function (fn, hasher) {
        var memo = {};
        var queues = {};
        var has = Object.prototype.hasOwnProperty;
        hasher = hasher || identity;
        var memoized = _restParam(function memoized(args) {
            var callback = args.pop();
            var key = hasher.apply(null, args);
            if (has.call(memo, key)) {   
                // 已执行,立即执行fn的回调
                async.setImmediate(function () {
                    callback.apply(null, memo[key]);
                });
            }
            else if (has.call(queues, key)) {
                // fn执行过程中,参数args相同,添加回调callback
                queues[key].push(callback);
            }
            else {
                queues[key] = [callback];// 执行前fn回调添加到queues队列
                fn.apply(null, args.concat([_restParam(function (args) {
                    memo[key] = args;// fn执行完成更新memo记录已执行,对象的键可以是数组、对象等数据类型
                    var q = queues[key];
                    delete queues[key];// 执行完清空该fn回调队列queues[key]
                    for (var i = 0, l = q.length; i < l; i++) {
                        q[i].apply(null, args);// 执行fn的回调
                    }
                })]));
            }
        });
        memoized.memo = memo;
        memoized.unmemoized = fn;// 没有memo记录的直接调用
        return memoized;
    };

    // 对async.memoize返回对象或普通函数fn作处理,视为普通函数处理
    async.unmemoize = function (fn) {
        return function () {
            return (fn.unmemoized || fn).apply(null, arguments);
        };
    };

    function _times(mapper) {
        return function (count, iterator, callback) {
            // _range(count)输出为0到count构成数组
            mapper(_range(count), iterator, callback);
        };
    }

    // 设置iterator的调用次数count
    async.times = _times(async.map);
    async.timesSeries = _times(async.mapSeries);
    async.timesLimit = function (count, limit, iterator, callback) {
        return async.mapLimit(_range(count), limit, iterator, callback);
    };

    // async.reduce的简化方案,遍历执行函数,对传递数据作处理后传给各回调
    async.seq = function (/* functions... */) {
        var fns = arguments;
        return _restParam(function (args) {
            var that = this;

            var callback = args[args.length - 1];
            if (typeof callback == 'function') {
                args.pop();
            } else {
                callback = noop;
            }

            async.reduce(fns, args, function (newargs, fn, cb) {
                fn.apply(that, newargs.concat([_restParam(function (err, nextargs) {
                    cb(err, nextargs);
                })]));
            },
            function (err, results) {
                callback.apply(that, [err].concat(results));
            });
        });
    };

    async.compose = function (/* functions... */) {
        return async.seq.apply(null, Array.prototype.reverse.call(arguments));
    };

    function _applyEach(eachfn) {
        return _restParam(function(fns, args) {
            var go = _restParam(function(args) {
                var that = this;
                var callback = args.pop();// 尾项为回调函数
                return eachfn(fns, function (fn, _, cb) {
                    fn.apply(that, args.concat([cb]));
                },
                callback);
            });
            if (args.length) {
                return go.apply(this, args);
            }
            else {
                return go;
            }
        });
    }

    // 调用async.eachOf遍历执行fns函数,fns函数参数均相同,且为args、callback
    async.applyEach = _applyEach(async.eachOf);
    async.applyEachSeries = _applyEach(async.eachOfSeries);

    // fn执行完毕,使用next函数,或者调用自身,或者报错时执行回调
    async.forever = function (fn, callback) {
        var done = only_once(callback || noop);
        var task = ensureAsync(fn);
        function next(err) {
            if (err) {
                return done(err);
            }
            task(next);
        }
        next();
    };

    function ensureAsync(fn) {
        // 输入参数最后一项是回调函数,sync为真同步执行状态,为否异步执行状态
        // 改写回调项,async.setImmediate跟普通调用callback的区别是没有堆栈溢出?
        return _restParam(function (args) {
            var callback = args.pop();
            args.push(function () {
                var innerArgs = arguments;
                if (sync) {
                    async.setImmediate(function () {
                        callback.apply(null, innerArgs);
                    });
                } else {
                    callback.apply(null, innerArgs);
                }
            });
            var sync = true;
            fn.apply(this, args);
            sync = false;
        });
    }

    // 改写参数函数fn中参数的回调函数callback设置,callback启动时机依然需使用者触发
    // 改写callback的目的避免同步函数的内存浪费
    async.ensureAsync = ensureAsync;

    // 重设callback首参err为null
    async.constant = _restParam(function(values) {
        var args = [null].concat(values);
        return function (callback) {
            return callback.apply(this, args);
        };
    });

    // 执行func,若返回值为thenable对象,再次执行该thenable对象的then方法,若不是,直接调用回调函数
    async.wrapSync =
    async.asyncify = function asyncify(func) {
        return _restParam(function (args) {
            var callback = args.pop();
            var result;
            try {
                result = func.apply(this, args);
            } catch (e) {
                return callback(e);
            }
            // if result is Promise object
            if (_isObject(result) && typeof result.then === "function") {
                result.then(function(value) {
                    callback(null, value);
                })["catch"](function(err) {
                    callback(err.message ? err : new Error(err));
                });
            } else {
                callback(null, result);
            }
        });
    };

    // Node.js
    if (typeof module === 'object' && module.exports) {
        module.exports = async;
    }
    // AMD / RequireJS
    else if (typeof define === 'function' && define.amd) {
        define([], function () {
            return async;
        });
    }
    // included directly via <script> tag
    else {
        root.async = async;
    }

}());

 

0
1
分享到:
评论

相关推荐

    async异步流程控制模块

    本篇将深入探讨`async`异步流程控制模块的相关知识点。 首先,我们需要了解JavaScript中的事件循环和回调函数。JavaScript是单线程的,为了处理耗时的I/O操作,引入了事件循环和回调机制。当执行到异步操作时,...

    Node.js-StuQ分享专题《深入浅出jsNode.js异步流程控制》完整版

    本专题《深入浅出js(Node.js)异步流程控制》将探讨Node.js中的异步编程这一核心概念,帮助开发者更深入地理解和掌握这一技术。 异步编程是Node.js的核心特性之一,它允许程序在等待I/O操作完成时继续执行其他任务...

    浅析node Async异步处理模块用例分析及常用方法介绍

    这些工具帮助开发者组织和管理异步流程,使代码更加清晰和可读。 1. **series 方法**: series 方法用于按顺序执行一组函数。每个函数在上一个函数完成之后才会开始执行,如果出现错误,系列执行将停止,并在回调...

    async-fifo异步FIFO 模块FPGA设计Verilog源码 quarus工程文件+文档说明.rar

    syn_fifo 同步FIFO 模块FPGA设计Verilog源码 quarus工程文件+文档说明,实现异步FIFO读写,且有读写地址产生和保护机制,保护机制,指的是FIFO不能读空和写满。且产生空满信号指示。 module asyn_fifo ( //input ...

    用于解构异步值的低级JS模块

    这个低级JS模块可能专注于优化异步控制流程,比如通过改善Promise的性能,提供更细粒度的错误处理,或者是实现更复杂的并发控制策略(如并行限制、优先级队列等)。它可能还包含了对async/await的扩展,例如在await...

    fifo_async_fifo_异步FIFO_

    在提供的源代码中,我们有两个主要模块:`fifo_async.v`是异步FIFO的主体实现,而`fifo_async_tb.v`是测试平台,用于验证FIFO的正确功能。 1. **读写指针模块**: - 读指针(read pointer)通常在一个时钟域中递增...

    dotnet-AspNetCore项目中非常轻量的异步任务管理模块

    1. **异步编程**:使用.NET的Task、async/await关键字来实现异步操作,确保任务在不阻塞主线程的情况下执行。 2. **定时任务**:可能采用了`System.Threading.Timer`、`Quartz.NET`或者其他轻量级定时器库来周期性地...

    winhttp 异步c++库

    2. **C++类库封装**:将WinHTTP API转换为C++的类,可以使代码更加模块化,易于理解和维护。封装可能包括创建类来代表HTTP请求、响应、会话等,并提供便利的方法来设置请求头、发送数据、接收响应等。 3. **异步...

    深入学习nodejs中的async模块的使用方法

    为了解决这个问题,Node.js社区开发了async模块,这是一个专门用于异步流程控制的模块。 async模块通过提供一系列实用的函数,可以帮助开发者以更加模块化和清晰的方式组织异步代码,减少嵌套回调(Callback Hell)...

    一个简单的异步网络通讯源代码

    在JavaScript中,可以使用async/await语法来编写更易读的异步代码。 在压缩包文件中,Readme.txt可能是包含项目说明、使用方法和注意事项的文本文件。WC和WS可能是服务器端(Server)和客户端(Client)的源代码...

    async_fifo.rar_async fifo验证_异步FIFO

    在`async_fifo`项目中,提供的源代码应该包含了上述所有关键元素,通过详细的注释和测试平台,开发者可以深入了解异步FIFO的工作原理,并将其应用于实际设计中。为了更好地理解并运用这些代码,建议仔细阅读代码,...

    js异步加载代码

    - 使用模块打包工具(如Webpack、Rollup)进行代码分割,将大型脚本拆分为小块,按需加载。 - 考虑使用CDN服务加速脚本下载。 通过以上分析,"asyLoad.js"可能是实现异步加载功能的一个库或自定义解决方案,其...

    异步套接字源码 v2.0(IOCP方式实现 支持多线程)

    这是一个套接字模块。使用内置iocp实现异步操作。使用了线程安全的队列操作回调,保证了对应域下的回调操作不需要加锁。async_socket.e。套接字模块。使用复杂,但是灵活,支持各种协议。...Tags:异步套接字源码。

    Async32 for delphi

    Async32 for Delphi 是一个针对 Delphi 开发环境的异步编程库,它提供了强大的功能,使得开发者能够在 Delphi 应用程序中轻松实现非阻塞的异步操作。异步编程是一种允许代码在等待某些操作完成时继续执行其他任务的...

    C# 异步传输文件经典实例源码

    该实例可能采用了工厂模式、单例模式或其他设计模式来组织代码,以实现可重用性、模块化和易于维护。 通过学习这个C#异步传输文件的经典实例,开发者可以掌握如何高效、安全地进行文件操作和网络通信,同时提升...

    异步FIFO Verilog源码与testbench

    在`async_fifo_tb.v` 文件中,我们找到了testbench的代码,它是验证异步FIFO功能是否正确的关键部分。Testbench通常包括以下步骤: 1. **初始化**:设置测试环境,例如初始化读写指针,填充FIFO,以及设置测试数据...

    SQLite-f047920c_sqlite3async.c_sqlite源码_

    `sqlite3async.c`文件是SQLite的一个扩展模块,它提供了一种异步接口,使得在多线程环境中可以更高效地处理数据库操作。在传统的SQLite API中,所有的数据库操作都是同步的,这意味着执行一个SQL语句时,程序会阻塞...

    FIFO实现异步通信verilog源码vivado

    标题中的“FIFO实现异步通信verilog源码vivado”意味着我们将探讨如何使用Verilog语言在Vivado环境下编写FIFO模块,以实现两个异步系统间的通信。以下将详细介绍相关知识点: 1. **FIFO的基本结构**:FIFO通常由一...

    前端开源库-fis-postprocessor-require-async

    本文将深入探讨一个名为“fis-postprocessor-require-async”的开源库,该库主要用于处理FIS(Fast Integrated System)框架中的异步模块加载机制。 FIS是一个强大的前端集成解决方案,它集成了构建、部署、调试等...

Global site tag (gtag.js) - Google Analytics