summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexander Early <aearly@fluid.com>2015-05-31 01:33:21 -0700
committerAlexander Early <aearly@fluid.com>2015-05-31 01:33:21 -0700
commit48e9a76c53d36e15499df41a5a5c119d2d9eb53d (patch)
tree62d2b9394c9396e4d32c37f25e466021367b711c
parentd6e59ab4015d809bb2650cf913b3f6277c3c4920 (diff)
downloadasync-48e9a76c53d36e15499df41a5a5c119d2d9eb53d.tar.gz
clean up logic in queue
-rw-r--r--lib/async.js144
1 files changed, 67 insertions, 77 deletions
diff --git a/lib/async.js b/lib/async.js
index d974d33..e3310d5 100644
--- a/lib/async.js
+++ b/lib/async.js
@@ -817,61 +817,56 @@
throw new Error('Concurrency must not be zero');
}
function _insert(q, data, pos, callback) {
- if (!q.started){
q.started = true;
- }
- if (!_isArray(data)) {
- data = [data];
- }
- if(data.length === 0) {
- // call drain immediately if there are no tasks
- return async.setImmediate(function() {
- if (q.drain) {
- q.drain();
- }
- });
- }
- _arrayEach(data, function(task) {
- var item = {
- data: task,
- callback: typeof callback === 'function' ? callback : null
- };
-
- if (pos) {
- q.tasks.unshift(item);
- } else {
- q.tasks.push(item);
- }
+ if (!_isArray(data)) {
+ data = [data];
+ }
+ if(data.length === 0) {
+ // call drain immediately if there are no tasks
+ return async.setImmediate(function() {
+ q.drain();
+ });
+ }
+ _arrayEach(data, function(task) {
+ var item = {
+ data: task,
+ callback: typeof callback === 'function' ? callback : noop
+ };
- if (q.saturated && q.tasks.length === q.concurrency) {
- q.saturated();
- }
- async.setImmediate(q.process);
- });
+ if (pos) {
+ q.tasks.unshift(item);
+ } else {
+ q.tasks.push(item);
+ }
+
+ if (q.tasks.length === q.concurrency) {
+ q.saturated();
+ }
+ async.setImmediate(q.process);
+ });
}
var workers = 0;
var q = {
tasks: [],
concurrency: concurrency,
- saturated: null,
- empty: null,
- drain: null,
+ saturated: noop,
+ empty: noop,
+ drain: noop,
started: false,
paused: false,
push: function (data, callback) {
- _insert(q, data, false, callback);
+ _insert(q, data, false, callback);
},
kill: function () {
- q.drain = null;
- q.tasks = [];
+ q.drain = noop;
+ q.tasks = [];
},
unshift: function (data, callback) {
- _insert(q, data, true, callback);
+ _insert(q, data, true, callback);
},
process: function () {
if (!q.paused && workers < q.concurrency && q.tasks.length) {
- // var task = q.tasks.shift();
var tasks = payload ?
q.tasks.splice(0, payload) :
q.tasks.splice(0, q.tasks.length);
@@ -880,7 +875,7 @@
return task.data;
});
- if (q.empty && q.tasks.length === 0) {
+ if (q.tasks.length === 0) {
q.empty();
}
workers += 1;
@@ -892,11 +887,9 @@
workers -= 1;
var args = arguments;
_arrayEach(tasks, function (task) {
- if (task.callback) {
- task.callback.apply(task, args);
- }
+ task.callback.apply(task, args);
});
- if (q.drain && q.tasks.length + workers === 0) {
+ if (q.tasks.length + workers === 0) {
q.drain();
}
q.process();
@@ -912,7 +905,6 @@
return q.tasks.length + workers === 0;
},
pause: function () {
- if (q.paused === true) { return; }
q.paused = true;
},
resume: function () {
@@ -940,52 +932,50 @@
async.priorityQueue = function (worker, concurrency) {
function _compareTasks(a, b){
- return a.priority - b.priority;
+ return a.priority - b.priority;
}
function _binarySearch(sequence, item, compare) {
var beg = -1,
end = sequence.length - 1;
while (beg < end) {
- var mid = beg + ((end - beg + 1) >>> 1);
- if (compare(item, sequence[mid]) >= 0) {
- beg = mid;
- } else {
- end = mid - 1;
- }
+ var mid = beg + ((end - beg + 1) >>> 1);
+ if (compare(item, sequence[mid]) >= 0) {
+ beg = mid;
+ } else {
+ end = mid - 1;
+ }
}
return beg;
}
function _insert(q, data, priority, callback) {
- if (!q.started){
- q.started = true;
- }
- if (!_isArray(data)) {
- data = [data];
- }
- if(data.length === 0) {
- // call drain immediately if there are no tasks
- return async.setImmediate(function() {
- if (q.drain) {
- q.drain();
- }
- });
- }
- _arrayEach(data, function(task) {
- var item = {
- data: task,
- priority: priority,
- callback: typeof callback === 'function' ? callback : null
- };
+ if (!q.started){
+ q.started = true;
+ }
+ if (!_isArray(data)) {
+ data = [data];
+ }
+ if(data.length === 0) {
+ // call drain immediately if there are no tasks
+ return async.setImmediate(function() {
+ q.drain();
+ });
+ }
+ _arrayEach(data, function(task) {
+ var item = {
+ data: task,
+ priority: priority,
+ callback: typeof callback === 'function' ? callback : noop
+ };
- q.tasks.splice(_binarySearch(q.tasks, item, _compareTasks) + 1, 0, item);
+ q.tasks.splice(_binarySearch(q.tasks, item, _compareTasks) + 1, 0, item);
- if (q.saturated && q.tasks.length === q.concurrency) {
- q.saturated();
- }
- async.setImmediate(q.process);
- });
+ if (q.tasks.length === q.concurrency) {
+ q.saturated();
+ }
+ async.setImmediate(q.process);
+ });
}
// Start with a normal queue
@@ -993,7 +983,7 @@
// Override push to accept second parameter representing priority
q.push = function (data, priority, callback) {
- _insert(q, data, priority, callback);
+ _insert(q, data, priority, callback);
};
// Remove unshift function