diff options
-rw-r--r-- | README.md | 6 | ||||
-rwxr-xr-x | lib/async.js | 23 | ||||
-rwxr-xr-x | test/test-async.js | 45 |
3 files changed, 61 insertions, 13 deletions
@@ -182,7 +182,7 @@ __Arguments__ * `arr` - An array to iterate over. * `iterator(item, callback)` - A function to apply to each item in `arr`. The iterator is passed a `callback(err)` which must be called once it has - completed. If no error has occured, the `callback` should be run without + completed. If no error has occurred, the `callback` should be run without arguments or with an explicit `null` argument. * `callback(err)` - A callback which is called when all `iterator` functions have finished, or an error occurs. @@ -256,7 +256,7 @@ __Arguments__ * `limit` - The maximum number of `iterator`s to run at any time. * `iterator(item, callback)` - A function to apply to each item in `arr`. The iterator is passed a `callback(err)` which must be called once it has - completed. If no error has occured, the callback should be run without + completed. If no error has occurred, the callback should be run without arguments or with an explicit `null` argument. * `callback(err)` - A callback which is called when all `iterator` functions have finished, or an error occurs. @@ -280,7 +280,7 @@ async.eachLimit(documents, 20, requestApi, function(err){ Produces a new array of values by mapping each value in `arr` through the `iterator` function. The `iterator` is called with an item from `arr` and a callback for when it has finished processing. Each of these callback takes 2 arguments: -an `error`, and the transformed item from `arr`. If `iterator` passes an error to this +an `error`, and the transformed item from `arr`. If `iterator` passes an error to his callback, the main `callback` (for the `map` function) is immediately called with the error. Note, that since this function applies the `iterator` to each item in parallel, diff --git a/lib/async.js b/lib/async.js index 01e8afc..a13f835 100755 --- a/lib/async.js +++ b/lib/async.js @@ -821,23 +821,26 @@ pause: function () { if (q.paused === true) { return; } q.paused = true; - q.process(); }, resume: function () { if (q.paused === false) { return; } q.paused = false; - q.process(); + // Need to call q.process once per concurrent + // worker to preserve full concurrency after pause + for (var w = 1; w <= q.concurrency; w++) { + async.setImmediate(q.process); + } } }; return q; }; - + async.priorityQueue = function (worker, concurrency) { - + function _compareTasks(a, b){ return a.priority - b.priority; }; - + function _binarySearch(sequence, item, compare) { var beg = -1, end = sequence.length - 1; @@ -851,7 +854,7 @@ } return beg; } - + function _insert(q, data, priority, callback) { if (!q.started){ q.started = true; @@ -873,7 +876,7 @@ priority: priority, callback: typeof callback === 'function' ? callback : null }; - + q.tasks.splice(_binarySearch(q.tasks, item, _compareTasks) + 1, 0, item); if (q.saturated && q.tasks.length === q.concurrency) { @@ -882,15 +885,15 @@ async.setImmediate(q.process); }); } - + // Start with a normal queue var q = async.queue(worker, concurrency); - + // Override push to accept second parameter representing priority q.push = function (data, priority, callback) { _insert(q, data, priority, callback); }; - + // Remove unshift function delete q.unshift; diff --git a/test/test-async.js b/test/test-async.js index ddf2916..6a35606 100755 --- a/test/test-async.js +++ b/test/test-async.js @@ -2502,6 +2502,51 @@ exports['queue pause'] = function(test) { }, 800); } +exports['queue pause with concurrency'] = function(test) { + var call_order = [], + task_timeout = 100, + pause_timeout = 50, + resume_timeout = 300, + tasks = [ 1, 2, 3, 4, 5, 6 ], + + elapsed = (function () { + var start = +Date.now(); + return function () { return Math.floor((+Date.now() - start) / 100) * 100; }; + })(); + + var q = async.queue(function (task, callback) { + setTimeout(function () { + call_order.push('process ' + task); + call_order.push('timeout ' + elapsed()); + callback(); + }, task_timeout); + }, 2); + + q.push(tasks); + + setTimeout(function () { + q.pause(); + test.equal(q.paused, true); + }, pause_timeout); + + setTimeout(function () { + q.resume(); + test.equal(q.paused, false); + }, resume_timeout); + + setTimeout(function () { + test.same(call_order, [ + 'process 1', 'timeout 100', + 'process 2', 'timeout 100', + 'process 3', 'timeout 400', + 'process 4', 'timeout 400', + 'process 5', 'timeout 500', + 'process 6', 'timeout 500' + ]); + test.done(); + }, 800); +} + exports['queue kill'] = function (test) { var q = async.queue(function (task, callback) { setTimeout(function () { |