summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.md6
-rwxr-xr-xlib/async.js23
-rwxr-xr-xtest/test-async.js45
3 files changed, 61 insertions, 13 deletions
diff --git a/README.md b/README.md
index 0bea531..8f4e98c 100644
--- a/README.md
+++ b/README.md
@@ -182,7 +182,7 @@ __Arguments__
* `arr` - An array to iterate over.
* `iterator(item, callback)` - A function to apply to each item in `arr`.
The iterator is passed a `callback(err)` which must be called once it has
- completed. If no error has occured, the `callback` should be run without
+ completed. If no error has occurred, the `callback` should be run without
arguments or with an explicit `null` argument.
* `callback(err)` - A callback which is called when all `iterator` functions
have finished, or an error occurs.
@@ -256,7 +256,7 @@ __Arguments__
* `limit` - The maximum number of `iterator`s to run at any time.
* `iterator(item, callback)` - A function to apply to each item in `arr`.
The iterator is passed a `callback(err)` which must be called once it has
- completed. If no error has occured, the callback should be run without
+ completed. If no error has occurred, the callback should be run without
arguments or with an explicit `null` argument.
* `callback(err)` - A callback which is called when all `iterator` functions
have finished, or an error occurs.
@@ -280,7 +280,7 @@ async.eachLimit(documents, 20, requestApi, function(err){
Produces a new array of values by mapping each value in `arr` through
the `iterator` function. The `iterator` is called with an item from `arr` and a
callback for when it has finished processing. Each of these callback takes 2 arguments:
-an `error`, and the transformed item from `arr`. If `iterator` passes an error to this
+an `error`, and the transformed item from `arr`. If `iterator` passes an error to his
callback, the main `callback` (for the `map` function) is immediately called with the error.
Note, that since this function applies the `iterator` to each item in parallel,
diff --git a/lib/async.js b/lib/async.js
index 01e8afc..a13f835 100755
--- a/lib/async.js
+++ b/lib/async.js
@@ -821,23 +821,26 @@
pause: function () {
if (q.paused === true) { return; }
q.paused = true;
- q.process();
},
resume: function () {
if (q.paused === false) { return; }
q.paused = false;
- q.process();
+ // Need to call q.process once per concurrent
+ // worker to preserve full concurrency after pause
+ for (var w = 1; w <= q.concurrency; w++) {
+ async.setImmediate(q.process);
+ }
}
};
return q;
};
-
+
async.priorityQueue = function (worker, concurrency) {
-
+
function _compareTasks(a, b){
return a.priority - b.priority;
};
-
+
function _binarySearch(sequence, item, compare) {
var beg = -1,
end = sequence.length - 1;
@@ -851,7 +854,7 @@
}
return beg;
}
-
+
function _insert(q, data, priority, callback) {
if (!q.started){
q.started = true;
@@ -873,7 +876,7 @@
priority: priority,
callback: typeof callback === 'function' ? callback : null
};
-
+
q.tasks.splice(_binarySearch(q.tasks, item, _compareTasks) + 1, 0, item);
if (q.saturated && q.tasks.length === q.concurrency) {
@@ -882,15 +885,15 @@
async.setImmediate(q.process);
});
}
-
+
// Start with a normal queue
var q = async.queue(worker, concurrency);
-
+
// Override push to accept second parameter representing priority
q.push = function (data, priority, callback) {
_insert(q, data, priority, callback);
};
-
+
// Remove unshift function
delete q.unshift;
diff --git a/test/test-async.js b/test/test-async.js
index ddf2916..6a35606 100755
--- a/test/test-async.js
+++ b/test/test-async.js
@@ -2502,6 +2502,51 @@ exports['queue pause'] = function(test) {
}, 800);
}
+exports['queue pause with concurrency'] = function(test) {
+ var call_order = [],
+ task_timeout = 100,
+ pause_timeout = 50,
+ resume_timeout = 300,
+ tasks = [ 1, 2, 3, 4, 5, 6 ],
+
+ elapsed = (function () {
+ var start = +Date.now();
+ return function () { return Math.floor((+Date.now() - start) / 100) * 100; };
+ })();
+
+ var q = async.queue(function (task, callback) {
+ setTimeout(function () {
+ call_order.push('process ' + task);
+ call_order.push('timeout ' + elapsed());
+ callback();
+ }, task_timeout);
+ }, 2);
+
+ q.push(tasks);
+
+ setTimeout(function () {
+ q.pause();
+ test.equal(q.paused, true);
+ }, pause_timeout);
+
+ setTimeout(function () {
+ q.resume();
+ test.equal(q.paused, false);
+ }, resume_timeout);
+
+ setTimeout(function () {
+ test.same(call_order, [
+ 'process 1', 'timeout 100',
+ 'process 2', 'timeout 100',
+ 'process 3', 'timeout 400',
+ 'process 4', 'timeout 400',
+ 'process 5', 'timeout 500',
+ 'process 6', 'timeout 500'
+ ]);
+ test.done();
+ }, 800);
+}
+
exports['queue kill'] = function (test) {
var q = async.queue(function (task, callback) {
setTimeout(function () {