diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/eachOfRateLimit.js | 62 | ||||
-rw-r--r-- | lib/internal/TokenBucket.js | 7 |
2 files changed, 68 insertions, 1 deletions
diff --git a/lib/eachOfRateLimit.js b/lib/eachOfRateLimit.js new file mode 100644 index 0000000..462a153 --- /dev/null +++ b/lib/eachOfRateLimit.js @@ -0,0 +1,62 @@ +import TokenBucket from './internal/TokenBucket'; + +import noop from 'lodash/noop'; +import once from './once'; + +import iterator from './iterator'; +import onlyOnce from './onlyOnce'; + +/** + * The same as [`eachOf`]{@link module:Collections.eachOf} but runs a maximum of `limit` async operations at a + * time. + * + * @name eachOfRateLimit + * @static + * @memberOf module:Collections + * @method + * @see [async.eachOf]{@link module:Collections.eachOf} + * @alias forEachOfRateLimit + * @category Collection + * @param {Array|Iterable|Object} coll - A collection to iterate over. + * @param {number} limit - The maximum number of async operations at a time. + * @param {Function} iteratee - A function to apply to each + * item in `coll`. The `key` is the item's key, or index in the case of an + * array. The iteratee is passed a `callback(err)` which must be called once it + * has completed. If no error has occurred, the callback should be run without + * arguments or with an explicit `null` argument. Invoked with + * (item, key, callback). + * @param {Object} rateLimitOptions - the rate limiting options. options.bucketSize + * will limit the amount of items queued within options.interval (miliseconds). + * @param {Function} [callback] - A callback which is called when all + * `iteratee` functions have finished, or an error occurs. Invoked with (err). + */ +export default function(coll, iteratee, options, callback) { + callback = once(callback || noop); + + var tokenBucket = new TokenBucket(options.bucketSize, options.interval); + + function iterateeCallback(err) { + if (err) { + tokenBucket.empty(); + callback(err); + } + // check nextElem iterator is exhausted (elem == null) to be sure + // we don't exit immediately due to a synchronous iteratee + else if (tokenBucket.queued === 0 && elem === null) { + return callback(null); + } + } + + + function enqueue(value, key) { + tokenBucket.enqueue(function() { + iteratee(value, key, onlyOnce(iterateeCallback)); + }); + } + + var nextElem = iterator(coll); + var elem; + while((elem = nextElem()) !== null) { + enqueue(elem.value, elem.key); + } +} diff --git a/lib/internal/TokenBucket.js b/lib/internal/TokenBucket.js index 011d13e..d31ed9c 100644 --- a/lib/internal/TokenBucket.js +++ b/lib/internal/TokenBucket.js @@ -30,9 +30,14 @@ TokenBucket.prototype.enqueue = function(operation) { } function onIntervalComplete(bucket) { - bucket.queued--; + if (bucket.queued > 0) bucket.queued--; if (bucket.queue.length > 0) { // call first queued operation (bucket.queue.shift())(); } } + +TokenBucket.prototype.empty = function() { + this.queue.empty(); + this.queued = 0; +} |