diff options
author | Alexander Early <aearly@fluid.com> | 2015-05-31 01:12:28 -0700 |
---|---|---|
committer | Alexander Early <aearly@fluid.com> | 2015-05-31 01:12:28 -0700 |
commit | d6e59ab4015d809bb2650cf913b3f6277c3c4920 (patch) | |
tree | b3d5ac33cce74f2ac74529bd806d7b49bc5a2080 | |
parent | f601150113224452d9f5c4c10ab4c7f5be9f85dd (diff) | |
download | async-d6e59ab4015d809bb2650cf913b3f6277c3c4920.tar.gz |
refactored queue and cargo to use the same logic
-rw-r--r-- | lib/async.js | 99 |
1 files changed, 27 insertions, 72 deletions
diff --git a/lib/async.js b/lib/async.js index 4e0aae6..d974d33 100644 --- a/lib/async.js +++ b/lib/async.js @@ -809,8 +809,8 @@ }); }; - async.queue = function (worker, concurrency) { - if (concurrency === undefined) { + function _queue(worker, concurrency, payload) { + if (concurrency == null) { concurrency = 1; } else if(concurrency === 0) { @@ -871,20 +871,31 @@ }, process: function () { if (!q.paused && workers < q.concurrency && q.tasks.length) { - var task = q.tasks.shift(); + // var task = q.tasks.shift(); + var tasks = payload ? + q.tasks.splice(0, payload) : + q.tasks.splice(0, q.tasks.length); + + var data = _map(tasks, function (task) { + return task.data; + }); + if (q.empty && q.tasks.length === 0) { q.empty(); } workers += 1; var cb = only_once(next); - worker(task.data, cb); + worker(data, cb); } function next() { workers -= 1; - if (task.callback) { - task.callback.apply(task, arguments); - } + var args = arguments; + _arrayEach(tasks, function (task) { + if (task.callback) { + task.callback.apply(task, args); + } + }); if (q.drain && q.tasks.length + workers === 0) { q.drain(); } @@ -916,6 +927,14 @@ } }; return q; + } + + async.queue = function (worker, concurrency) { + var q = _queue(function (items, cb) { + worker(items[0], cb); + }, concurrency, 1); + + return q; }; async.priorityQueue = function (worker, concurrency) { @@ -984,71 +1003,7 @@ }; async.cargo = function (worker, payload) { - var working = false, - tasks = []; - - var cargo = { - tasks: tasks, - payload: payload, - saturated: null, - empty: null, - drain: null, - drained: true, - push: function (data, callback) { - if (!_isArray(data)) { - data = [data]; - } - _arrayEach(data, function(task) { - tasks.push({ - data: task, - callback: typeof callback === 'function' ? callback : null - }); - cargo.drained = false; - if (cargo.saturated && tasks.length === payload) { - cargo.saturated(); - } - }); - async.setImmediate(cargo.process); - }, - process: function process() { - if (working) return; - if (tasks.length === 0) { - if(cargo.drain && !cargo.drained) cargo.drain(); - cargo.drained = true; - return; - } - - var ts = typeof payload === 'number' ? - tasks.splice(0, payload) : - tasks.splice(0, tasks.length); - - var ds = _map(ts, function (task) { - return task.data; - }); - - if(cargo.empty) cargo.empty(); - working = true; - worker(ds, function () { - working = false; - - var args = arguments; - _arrayEach(ts, function (data) { - if (data.callback) { - data.callback.apply(null, args); - } - }); - - process(); - }); - }, - length: function () { - return tasks.length; - }, - running: function () { - return working; - } - }; - return cargo; + return _queue(worker, 1, payload); }; function _console_fn(name) { |