summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormk <me@maxkueng.com>2014-03-28 23:41:16 +0100
committermk <me@maxkueng.com>2014-03-28 23:41:16 +0100
commit4722e785a4ebadba130be1a3d005703871b35233 (patch)
tree999d1a2222a49b817eb0ea0586dfe3f12aad30b2
parent28697107d8821a8f3501143a87c9a341e3e9140a (diff)
downloadasync-4722e785a4ebadba130be1a3d005703871b35233.tar.gz
Add queue.pause() and queue.resume()
-rw-r--r--README.md3
-rwxr-xr-xlib/async.js13
-rwxr-xr-xtest/test-async.js51
3 files changed, 66 insertions, 1 deletions
diff --git a/README.md b/README.md
index acee1b2..0a87149 100644
--- a/README.md
+++ b/README.md
@@ -1088,6 +1088,9 @@ methods:
and further tasks will be queued.
* `empty` - a callback that is called when the last item from the `queue` is given to a `worker`.
* `drain` - a callback that is called when the last item from the `queue` has returned from the `worker`.
+* `paused` - a boolean for determining whether the queue is in a paused state
+* `pause()` - a function that pauses the processing of tasks until `resume()` is called.
+* `resume()` - a function that resumes the processing of queued tasks when the queue is paused.
__Example__
diff --git a/lib/async.js b/lib/async.js
index 64398b9..cf68a84 100755
--- a/lib/async.js
+++ b/lib/async.js
@@ -723,6 +723,7 @@
saturated: null,
empty: null,
drain: null,
+ paused: false,
push: function (data, callback) {
_insert(q, data, false, callback);
},
@@ -730,7 +731,7 @@
_insert(q, data, true, callback);
},
process: function () {
- if (workers < q.concurrency && q.tasks.length) {
+ if (!q.paused && workers < q.concurrency && q.tasks.length) {
var task = q.tasks.shift();
if (q.empty && q.tasks.length === 0) {
q.empty();
@@ -758,6 +759,16 @@
},
idle: function() {
return q.tasks.length + workers === 0;
+ },
+ pause: function () {
+ if (q.paused === true) { return; }
+ q.paused = true;
+ q.process();
+ },
+ resume: function () {
+ if (q.paused === false) { return; }
+ q.paused = false;
+ q.process();
}
};
return q;
diff --git a/test/test-async.js b/test/test-async.js
index f7eca6d..295249e 100755
--- a/test/test-async.js
+++ b/test/test-async.js
@@ -2275,6 +2275,57 @@ exports['queue idle'] = function(test) {
}
}
+exports['queue pause'] = function(test) {
+ var call_order = [],
+ task_timeout = 100,
+ pause_timeout = 300,
+ resume_timeout = 500,
+ tasks = [ 1, 2, 3, 4, 5, 6 ],
+
+ elapsed = (function () {
+ var start = +Date.now();
+ return function () { return Math.floor((+Date.now() - start) / 100) * 100; };
+ })();
+
+ var q = async.queue(function (task, callback) {
+ call_order.push('process ' + task);
+ call_order.push('timeout ' + elapsed());
+ callback();
+ });
+
+ function pushTask () {
+ var task = tasks.shift();
+ if (!task) { return; }
+ setTimeout(function () {
+ q.push(task);
+ pushTask();
+ }, task_timeout);
+ }
+ pushTask();
+
+ setTimeout(function () {
+ q.pause();
+ test.equal(q.paused, true);
+ }, pause_timeout);
+
+ setTimeout(function () {
+ q.resume();
+ test.equal(q.paused, false);
+ }, resume_timeout);
+
+ setTimeout(function () {
+ test.same(call_order, [
+ 'process 1', 'timeout 100',
+ 'process 2', 'timeout 200',
+ 'process 3', 'timeout 500',
+ 'process 4', 'timeout 500',
+ 'process 5', 'timeout 500',
+ 'process 6', 'timeout 600'
+ ]);
+ test.done();
+ }, 800);
+}
+
exports['cargo'] = function (test) {
var call_order = [],
delays = [160, 160, 80];