summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexander Early <aearly@fluid.com>2015-05-31 01:12:28 -0700
committerAlexander Early <aearly@fluid.com>2015-05-31 01:12:28 -0700
commitd6e59ab4015d809bb2650cf913b3f6277c3c4920 (patch)
treeb3d5ac33cce74f2ac74529bd806d7b49bc5a2080
parentf601150113224452d9f5c4c10ab4c7f5be9f85dd (diff)
downloadasync-d6e59ab4015d809bb2650cf913b3f6277c3c4920.tar.gz
refactored queue and cargo to use the same logic
-rw-r--r--lib/async.js99
1 files changed, 27 insertions, 72 deletions
diff --git a/lib/async.js b/lib/async.js
index 4e0aae6..d974d33 100644
--- a/lib/async.js
+++ b/lib/async.js
@@ -809,8 +809,8 @@
});
};
- async.queue = function (worker, concurrency) {
- if (concurrency === undefined) {
+ function _queue(worker, concurrency, payload) {
+ if (concurrency == null) {
concurrency = 1;
}
else if(concurrency === 0) {
@@ -871,20 +871,31 @@
},
process: function () {
if (!q.paused && workers < q.concurrency && q.tasks.length) {
- var task = q.tasks.shift();
+ // var task = q.tasks.shift();
+ var tasks = payload ?
+ q.tasks.splice(0, payload) :
+ q.tasks.splice(0, q.tasks.length);
+
+ var data = _map(tasks, function (task) {
+ return task.data;
+ });
+
if (q.empty && q.tasks.length === 0) {
q.empty();
}
workers += 1;
var cb = only_once(next);
- worker(task.data, cb);
+ worker(data, cb);
}
function next() {
workers -= 1;
- if (task.callback) {
- task.callback.apply(task, arguments);
- }
+ var args = arguments;
+ _arrayEach(tasks, function (task) {
+ if (task.callback) {
+ task.callback.apply(task, args);
+ }
+ });
if (q.drain && q.tasks.length + workers === 0) {
q.drain();
}
@@ -916,6 +927,14 @@
}
};
return q;
+ }
+
+ async.queue = function (worker, concurrency) {
+ var q = _queue(function (items, cb) {
+ worker(items[0], cb);
+ }, concurrency, 1);
+
+ return q;
};
async.priorityQueue = function (worker, concurrency) {
@@ -984,71 +1003,7 @@
};
async.cargo = function (worker, payload) {
- var working = false,
- tasks = [];
-
- var cargo = {
- tasks: tasks,
- payload: payload,
- saturated: null,
- empty: null,
- drain: null,
- drained: true,
- push: function (data, callback) {
- if (!_isArray(data)) {
- data = [data];
- }
- _arrayEach(data, function(task) {
- tasks.push({
- data: task,
- callback: typeof callback === 'function' ? callback : null
- });
- cargo.drained = false;
- if (cargo.saturated && tasks.length === payload) {
- cargo.saturated();
- }
- });
- async.setImmediate(cargo.process);
- },
- process: function process() {
- if (working) return;
- if (tasks.length === 0) {
- if(cargo.drain && !cargo.drained) cargo.drain();
- cargo.drained = true;
- return;
- }
-
- var ts = typeof payload === 'number' ?
- tasks.splice(0, payload) :
- tasks.splice(0, tasks.length);
-
- var ds = _map(ts, function (task) {
- return task.data;
- });
-
- if(cargo.empty) cargo.empty();
- working = true;
- worker(ds, function () {
- working = false;
-
- var args = arguments;
- _arrayEach(ts, function (data) {
- if (data.callback) {
- data.callback.apply(null, args);
- }
- });
-
- process();
- });
- },
- length: function () {
- return tasks.length;
- },
- running: function () {
- return working;
- }
- };
- return cargo;
+ return _queue(worker, 1, payload);
};
function _console_fn(name) {