From 48e9a76c53d36e15499df41a5a5c119d2d9eb53d Mon Sep 17 00:00:00 2001 From: Alexander Early Date: Sun, 31 May 2015 01:33:21 -0700 Subject: clean up logic in queue --- lib/async.js | 144 +++++++++++++++++++++++++++-------------------------------- 1 file changed, 67 insertions(+), 77 deletions(-) diff --git a/lib/async.js b/lib/async.js index d974d33..e3310d5 100644 --- a/lib/async.js +++ b/lib/async.js @@ -817,61 +817,56 @@ throw new Error('Concurrency must not be zero'); } function _insert(q, data, pos, callback) { - if (!q.started){ q.started = true; - } - if (!_isArray(data)) { - data = [data]; - } - if(data.length === 0) { - // call drain immediately if there are no tasks - return async.setImmediate(function() { - if (q.drain) { - q.drain(); - } - }); - } - _arrayEach(data, function(task) { - var item = { - data: task, - callback: typeof callback === 'function' ? callback : null - }; - - if (pos) { - q.tasks.unshift(item); - } else { - q.tasks.push(item); - } + if (!_isArray(data)) { + data = [data]; + } + if(data.length === 0) { + // call drain immediately if there are no tasks + return async.setImmediate(function() { + q.drain(); + }); + } + _arrayEach(data, function(task) { + var item = { + data: task, + callback: typeof callback === 'function' ? callback : noop + }; - if (q.saturated && q.tasks.length === q.concurrency) { - q.saturated(); - } - async.setImmediate(q.process); - }); + if (pos) { + q.tasks.unshift(item); + } else { + q.tasks.push(item); + } + + if (q.tasks.length === q.concurrency) { + q.saturated(); + } + async.setImmediate(q.process); + }); } var workers = 0; var q = { tasks: [], concurrency: concurrency, - saturated: null, - empty: null, - drain: null, + saturated: noop, + empty: noop, + drain: noop, started: false, paused: false, push: function (data, callback) { - _insert(q, data, false, callback); + _insert(q, data, false, callback); }, kill: function () { - q.drain = null; - q.tasks = []; + q.drain = noop; + q.tasks = []; }, unshift: function (data, callback) { - _insert(q, data, true, callback); + _insert(q, data, true, callback); }, process: function () { if (!q.paused && workers < q.concurrency && q.tasks.length) { - // var task = q.tasks.shift(); var tasks = payload ? q.tasks.splice(0, payload) : q.tasks.splice(0, q.tasks.length); @@ -880,7 +875,7 @@ return task.data; }); - if (q.empty && q.tasks.length === 0) { + if (q.tasks.length === 0) { q.empty(); } workers += 1; @@ -892,11 +887,9 @@ workers -= 1; var args = arguments; _arrayEach(tasks, function (task) { - if (task.callback) { - task.callback.apply(task, args); - } + task.callback.apply(task, args); }); - if (q.drain && q.tasks.length + workers === 0) { + if (q.tasks.length + workers === 0) { q.drain(); } q.process(); @@ -912,7 +905,6 @@ return q.tasks.length + workers === 0; }, pause: function () { - if (q.paused === true) { return; } q.paused = true; }, resume: function () { @@ -940,52 +932,50 @@ async.priorityQueue = function (worker, concurrency) { function _compareTasks(a, b){ - return a.priority - b.priority; + return a.priority - b.priority; } function _binarySearch(sequence, item, compare) { var beg = -1, end = sequence.length - 1; while (beg < end) { - var mid = beg + ((end - beg + 1) >>> 1); - if (compare(item, sequence[mid]) >= 0) { - beg = mid; - } else { - end = mid - 1; - } + var mid = beg + ((end - beg + 1) >>> 1); + if (compare(item, sequence[mid]) >= 0) { + beg = mid; + } else { + end = mid - 1; + } } return beg; } function _insert(q, data, priority, callback) { - if (!q.started){ - q.started = true; - } - if (!_isArray(data)) { - data = [data]; - } - if(data.length === 0) { - // call drain immediately if there are no tasks - return async.setImmediate(function() { - if (q.drain) { - q.drain(); - } - }); - } - _arrayEach(data, function(task) { - var item = { - data: task, - priority: priority, - callback: typeof callback === 'function' ? callback : null - }; + if (!q.started){ + q.started = true; + } + if (!_isArray(data)) { + data = [data]; + } + if(data.length === 0) { + // call drain immediately if there are no tasks + return async.setImmediate(function() { + q.drain(); + }); + } + _arrayEach(data, function(task) { + var item = { + data: task, + priority: priority, + callback: typeof callback === 'function' ? callback : noop + }; - q.tasks.splice(_binarySearch(q.tasks, item, _compareTasks) + 1, 0, item); + q.tasks.splice(_binarySearch(q.tasks, item, _compareTasks) + 1, 0, item); - if (q.saturated && q.tasks.length === q.concurrency) { - q.saturated(); - } - async.setImmediate(q.process); - }); + if (q.tasks.length === q.concurrency) { + q.saturated(); + } + async.setImmediate(q.process); + }); } // Start with a normal queue @@ -993,7 +983,7 @@ // Override push to accept second parameter representing priority q.push = function (data, priority, callback) { - _insert(q, data, priority, callback); + _insert(q, data, priority, callback); }; // Remove unshift function -- cgit v1.2.1