diff options
Diffstat (limited to 'tools/eslint/node_modules/rx-lite/rx.lite.js')
-rw-r--r-- | tools/eslint/node_modules/rx-lite/rx.lite.js | 5000 |
1 files changed, 2844 insertions, 2156 deletions
diff --git a/tools/eslint/node_modules/rx-lite/rx.lite.js b/tools/eslint/node_modules/rx-lite/rx.lite.js index 21e9aa5b97..0603c30e2f 100644 --- a/tools/eslint/node_modules/rx-lite/rx.lite.js +++ b/tools/eslint/node_modules/rx-lite/rx.lite.js @@ -1,4 +1,4 @@ -// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. +// Copyright (c) Microsoft, All rights reserved. See License.txt in the project root for license information. ;(function (undefined) { @@ -7,15 +7,18 @@ 'object': true }; - var - freeExports = objectTypes[typeof exports] && exports && !exports.nodeType && exports, - freeSelf = objectTypes[typeof self] && self.Object && self, - freeWindow = objectTypes[typeof window] && window && window.Object && window, - freeModule = objectTypes[typeof module] && module && !module.nodeType && module, - moduleExports = freeModule && freeModule.exports === freeExports && freeExports, - freeGlobal = freeExports && freeModule && typeof global == 'object' && global && global.Object && global; + function checkGlobal(value) { + return (value && value.Object === Object) ? value : null; + } - var root = root = freeGlobal || ((freeWindow !== (this && this.window)) && freeWindow) || freeSelf || this; + var freeExports = (objectTypes[typeof exports] && exports && !exports.nodeType) ? exports : null; + var freeModule = (objectTypes[typeof module] && module && !module.nodeType) ? module : null; + var freeGlobal = checkGlobal(freeExports && freeModule && typeof global === 'object' && global); + var freeSelf = checkGlobal(objectTypes[typeof self] && self); + var freeWindow = checkGlobal(objectTypes[typeof window] && window); + var moduleExports = (freeModule && freeModule.exports === freeExports) ? freeExports : null; + var thisGlobal = checkGlobal(objectTypes[typeof this] && this); + var root = freeGlobal || ((freeWindow !== (thisGlobal && thisGlobal.window)) && freeWindow) || freeSelf || thisGlobal || Function('return this')(); var Rx = { internals: {}, @@ -38,7 +41,7 @@ var isFn = function (value) { return typeof value == 'function' || false; - } + }; // fallback for older versions of Chrome and Safari if (isFn(/x/)) { @@ -57,6 +60,7 @@ } var errorObj = {e: {}}; + function tryCatcherGen(tryCatchTarget) { return function tryCatcher() { try { @@ -65,12 +69,14 @@ errorObj.e = e; return errorObj; } - } + }; } + var tryCatch = Rx.internals.tryCatch = function tryCatch(fn) { if (!isFunction(fn)) { throw new TypeError('fn must be a function'); } return tryCatcherGen(fn); - } + }; + function thrower(e) { throw e; } @@ -168,38 +174,38 @@ var EmptyError = Rx.EmptyError = function() { this.message = 'Sequence contains no elements.'; - this.name = 'EmptyError'; Error.call(this); }; EmptyError.prototype = Object.create(Error.prototype); + EmptyError.prototype.name = 'EmptyError'; var ObjectDisposedError = Rx.ObjectDisposedError = function() { this.message = 'Object has been disposed'; - this.name = 'ObjectDisposedError'; Error.call(this); }; ObjectDisposedError.prototype = Object.create(Error.prototype); + ObjectDisposedError.prototype.name = 'ObjectDisposedError'; var ArgumentOutOfRangeError = Rx.ArgumentOutOfRangeError = function () { this.message = 'Argument out of range'; - this.name = 'ArgumentOutOfRangeError'; Error.call(this); }; ArgumentOutOfRangeError.prototype = Object.create(Error.prototype); + ArgumentOutOfRangeError.prototype.name = 'ArgumentOutOfRangeError'; var NotSupportedError = Rx.NotSupportedError = function (message) { this.message = message || 'This operation is not supported'; - this.name = 'NotSupportedError'; Error.call(this); }; NotSupportedError.prototype = Object.create(Error.prototype); + NotSupportedError.prototype.name = 'NotSupportedError'; var NotImplementedError = Rx.NotImplementedError = function (message) { this.message = message || 'This operation is not implemented'; - this.name = 'NotImplementedError'; Error.call(this); }; NotImplementedError.prototype = Object.create(Error.prototype); + NotImplementedError.prototype.name = 'NotImplementedError'; var notImplemented = Rx.helpers.notImplemented = function () { throw new NotImplementedError(); @@ -220,12 +226,12 @@ var doneEnumerator = Rx.doneEnumerator = { done: true, value: undefined }; var isIterable = Rx.helpers.isIterable = function (o) { - return o[$iterator$] !== undefined; - } + return o && o[$iterator$] !== undefined; + }; var isArrayLike = Rx.helpers.isArrayLike = function (o) { return o && o.length !== undefined; - } + }; Rx.helpers.iterator = $iterator$; @@ -239,7 +245,7 @@ case 1: return function(arg) { return func.call(thisArg, arg); - } + }; case 2: return function(value, index) { return func.call(thisArg, value, index); @@ -265,280 +271,303 @@ 'constructor'], dontEnumsLength = dontEnums.length; - /** `Object#toString` result shortcuts */ - var argsClass = '[object Arguments]', - arrayClass = '[object Array]', - boolClass = '[object Boolean]', - dateClass = '[object Date]', - errorClass = '[object Error]', - funcClass = '[object Function]', - numberClass = '[object Number]', - objectClass = '[object Object]', - regexpClass = '[object RegExp]', - stringClass = '[object String]'; - - var toString = Object.prototype.toString, - hasOwnProperty = Object.prototype.hasOwnProperty, - supportsArgsClass = toString.call(arguments) == argsClass, // For less <IE9 && FF<4 - supportNodeClass, - errorProto = Error.prototype, - objectProto = Object.prototype, - stringProto = String.prototype, - propertyIsEnumerable = objectProto.propertyIsEnumerable; - - try { - supportNodeClass = !(toString.call(document) == objectClass && !({ 'toString': 0 } + '')); - } catch (e) { - supportNodeClass = true; - } - - var nonEnumProps = {}; - nonEnumProps[arrayClass] = nonEnumProps[dateClass] = nonEnumProps[numberClass] = { 'constructor': true, 'toLocaleString': true, 'toString': true, 'valueOf': true }; - nonEnumProps[boolClass] = nonEnumProps[stringClass] = { 'constructor': true, 'toString': true, 'valueOf': true }; - nonEnumProps[errorClass] = nonEnumProps[funcClass] = nonEnumProps[regexpClass] = { 'constructor': true, 'toString': true }; - nonEnumProps[objectClass] = { 'constructor': true }; - - var support = {}; - (function () { - var ctor = function() { this.x = 1; }, - props = []; - - ctor.prototype = { 'valueOf': 1, 'y': 1 }; - for (var key in new ctor) { props.push(key); } - for (key in arguments) { } - - // Detect if `name` or `message` properties of `Error.prototype` are enumerable by default. - support.enumErrorProps = propertyIsEnumerable.call(errorProto, 'message') || propertyIsEnumerable.call(errorProto, 'name'); +var argsTag = '[object Arguments]', + arrayTag = '[object Array]', + boolTag = '[object Boolean]', + dateTag = '[object Date]', + errorTag = '[object Error]', + funcTag = '[object Function]', + mapTag = '[object Map]', + numberTag = '[object Number]', + objectTag = '[object Object]', + regexpTag = '[object RegExp]', + setTag = '[object Set]', + stringTag = '[object String]', + weakMapTag = '[object WeakMap]'; + +var arrayBufferTag = '[object ArrayBuffer]', + float32Tag = '[object Float32Array]', + float64Tag = '[object Float64Array]', + int8Tag = '[object Int8Array]', + int16Tag = '[object Int16Array]', + int32Tag = '[object Int32Array]', + uint8Tag = '[object Uint8Array]', + uint8ClampedTag = '[object Uint8ClampedArray]', + uint16Tag = '[object Uint16Array]', + uint32Tag = '[object Uint32Array]'; + +var typedArrayTags = {}; +typedArrayTags[float32Tag] = typedArrayTags[float64Tag] = +typedArrayTags[int8Tag] = typedArrayTags[int16Tag] = +typedArrayTags[int32Tag] = typedArrayTags[uint8Tag] = +typedArrayTags[uint8ClampedTag] = typedArrayTags[uint16Tag] = +typedArrayTags[uint32Tag] = true; +typedArrayTags[argsTag] = typedArrayTags[arrayTag] = +typedArrayTags[arrayBufferTag] = typedArrayTags[boolTag] = +typedArrayTags[dateTag] = typedArrayTags[errorTag] = +typedArrayTags[funcTag] = typedArrayTags[mapTag] = +typedArrayTags[numberTag] = typedArrayTags[objectTag] = +typedArrayTags[regexpTag] = typedArrayTags[setTag] = +typedArrayTags[stringTag] = typedArrayTags[weakMapTag] = false; + +var objectProto = Object.prototype, + hasOwnProperty = objectProto.hasOwnProperty, + objToString = objectProto.toString, + MAX_SAFE_INTEGER = Math.pow(2, 53) - 1; + +var keys = Object.keys || (function() { + var hasOwnProperty = Object.prototype.hasOwnProperty, + hasDontEnumBug = !({ toString: null }).propertyIsEnumerable('toString'), + dontEnums = [ + 'toString', + 'toLocaleString', + 'valueOf', + 'hasOwnProperty', + 'isPrototypeOf', + 'propertyIsEnumerable', + 'constructor' + ], + dontEnumsLength = dontEnums.length; + + return function(obj) { + if (typeof obj !== 'object' && (typeof obj !== 'function' || obj === null)) { + throw new TypeError('Object.keys called on non-object'); + } - // Detect if `prototype` properties are enumerable by default. - support.enumPrototypes = propertyIsEnumerable.call(ctor, 'prototype'); + var result = [], prop, i; - // Detect if `arguments` object indexes are non-enumerable - support.nonEnumArgs = key != 0; + for (prop in obj) { + if (hasOwnProperty.call(obj, prop)) { + result.push(prop); + } + } - // Detect if properties shadowing those on `Object.prototype` are non-enumerable. - support.nonEnumShadows = !/valueOf/.test(props); - }(1)); + if (hasDontEnumBug) { + for (i = 0; i < dontEnumsLength; i++) { + if (hasOwnProperty.call(obj, dontEnums[i])) { + result.push(dontEnums[i]); + } + } + } + return result; + }; + }()); - var isObject = Rx.internals.isObject = function(value) { - var type = typeof value; - return value && (type == 'function' || type == 'object') || false; - }; +function equalObjects(object, other, equalFunc, isLoose, stackA, stackB) { + var objProps = keys(object), + objLength = objProps.length, + othProps = keys(other), + othLength = othProps.length; - function keysIn(object) { - var result = []; - if (!isObject(object)) { - return result; + if (objLength !== othLength && !isLoose) { + return false; + } + var index = objLength, key; + while (index--) { + key = objProps[index]; + if (!(isLoose ? key in other : hasOwnProperty.call(other, key))) { + return false; } - if (support.nonEnumArgs && object.length && isArguments(object)) { - object = slice.call(object); + } + var skipCtor = isLoose; + while (++index < objLength) { + key = objProps[index]; + var objValue = object[key], + othValue = other[key], + result; + + if (!(result === undefined ? equalFunc(objValue, othValue, isLoose, stackA, stackB) : result)) { + return false; } - var skipProto = support.enumPrototypes && typeof object == 'function', - skipErrorProps = support.enumErrorProps && (object === errorProto || object instanceof Error); - - for (var key in object) { - if (!(skipProto && key == 'prototype') && - !(skipErrorProps && (key == 'message' || key == 'name'))) { - result.push(key); - } + skipCtor || (skipCtor = key === 'constructor'); + } + if (!skipCtor) { + var objCtor = object.constructor, + othCtor = other.constructor; + + if (objCtor !== othCtor && + ('constructor' in object && 'constructor' in other) && + !(typeof objCtor === 'function' && objCtor instanceof objCtor && + typeof othCtor === 'function' && othCtor instanceof othCtor)) { + return false; } + } + return true; +} - if (support.nonEnumShadows && object !== objectProto) { - var ctor = object.constructor, - index = -1, - length = dontEnumsLength; +function equalByTag(object, other, tag) { + switch (tag) { + case boolTag: + case dateTag: + return +object === +other; - if (object === (ctor && ctor.prototype)) { - var className = object === stringProto ? stringClass : object === errorProto ? errorClass : toString.call(object), - nonEnum = nonEnumProps[className]; - } - while (++index < length) { - key = dontEnums[index]; - if (!(nonEnum && nonEnum[key]) && hasOwnProperty.call(object, key)) { - result.push(key); - } - } - } - return result; - } + case errorTag: + return object.name === other.name && object.message === other.message; - function internalFor(object, callback, keysFunc) { - var index = -1, - props = keysFunc(object), - length = props.length; + case numberTag: + return (object !== +object) ? + other !== +other : + object === +other; - while (++index < length) { - var key = props[index]; - if (callback(object[key], key, object) === false) { - break; - } - } - return object; + case regexpTag: + case stringTag: + return object === (other + ''); } + return false; +} - function internalForIn(object, callback) { - return internalFor(object, callback, keysIn); - } +var isObject = Rx.internals.isObject = function(value) { + var type = typeof value; + return !!value && (type === 'object' || type === 'function'); +}; - function isNode(value) { - // IE < 9 presents DOM nodes as `Object` objects except they have `toString` - // methods that are `typeof` "string" and still can coerce nodes to strings - return typeof value.toString != 'function' && typeof (value + '') == 'string'; - } +function isObjectLike(value) { + return !!value && typeof value === 'object'; +} - var isArguments = function(value) { - return (value && typeof value == 'object') ? toString.call(value) == argsClass : false; - } +function isLength(value) { + return typeof value === 'number' && value > -1 && value % 1 === 0 && value <= MAX_SAFE_INTEGER; +} - // fallback for browsers that can't detect `arguments` objects by [[Class]] - if (!supportsArgsClass) { - isArguments = function(value) { - return (value && typeof value == 'object') ? hasOwnProperty.call(value, 'callee') : false; - }; +var isHostObject = (function() { + try { + Object({ 'toString': 0 } + ''); + } catch(e) { + return function() { return false; }; } - - var isEqual = Rx.internals.isEqual = function (x, y) { - return deepEquals(x, y, [], []); + return function(value) { + return typeof value.toString !== 'function' && typeof (value + '') === 'string'; }; +}()); - /** @private - * Used for deep comparison - **/ - function deepEquals(a, b, stackA, stackB) { - // exit early for identical values - if (a === b) { - // treat `+0` vs. `-0` as not equal - return a !== 0 || (1 / a == 1 / b); - } +function isTypedArray(value) { + return isObjectLike(value) && isLength(value.length) && !!typedArrayTags[objToString.call(value)]; +} + +var isArray = Array.isArray || function(value) { + return isObjectLike(value) && isLength(value.length) && objToString.call(value) === arrayTag; +}; - var type = typeof a, - otherType = typeof b; +function arraySome (array, predicate) { + var index = -1, + length = array.length; - // exit early for unlike primitive values - if (a === a && (a == null || b == null || - (type != 'function' && type != 'object' && otherType != 'function' && otherType != 'object'))) { - return false; + while (++index < length) { + if (predicate(array[index], index, array)) { + return true; } + } + return false; +} - // compare [[Class]] names - var className = toString.call(a), - otherClass = toString.call(b); +function equalArrays(array, other, equalFunc, isLoose, stackA, stackB) { + var index = -1, + arrLength = array.length, + othLength = other.length; - if (className == argsClass) { - className = objectClass; - } - if (otherClass == argsClass) { - otherClass = objectClass; - } - if (className != otherClass) { + if (arrLength !== othLength && !(isLoose && othLength > arrLength)) { + return false; + } + // Ignore non-index properties. + while (++index < arrLength) { + var arrValue = array[index], + othValue = other[index], + result; + + if (result !== undefined) { + if (result) { + continue; + } return false; } - switch (className) { - case boolClass: - case dateClass: - // coerce dates and booleans to numbers, dates to milliseconds and booleans - // to `1` or `0` treating invalid dates coerced to `NaN` as not equal - return +a == +b; - - case numberClass: - // treat `NaN` vs. `NaN` as equal - return (a != +a) ? - b != +b : - // but treat `-0` vs. `+0` as not equal - (a == 0 ? (1 / a == 1 / b) : a == +b); - - case regexpClass: - case stringClass: - // coerce regexes to strings (http://es5.github.io/#x15.10.6.4) - // treat string primitives and their corresponding object instances as equal - return a == String(b); - } - var isArr = className == arrayClass; - if (!isArr) { - - // exit for functions and DOM nodes - if (className != objectClass || (!support.nodeClass && (isNode(a) || isNode(b)))) { + // Recursively compare arrays (susceptible to call stack limits). + if (isLoose) { + if (!arraySome(other, function(othValue) { + return arrValue === othValue || equalFunc(arrValue, othValue, isLoose, stackA, stackB); + })) { return false; } - // in older versions of Opera, `arguments` objects have `Array` constructors - var ctorA = !support.argsObject && isArguments(a) ? Object : a.constructor, - ctorB = !support.argsObject && isArguments(b) ? Object : b.constructor; - - // non `Object` object instances with different constructors are not equal - if (ctorA != ctorB && - !(hasOwnProperty.call(a, 'constructor') && hasOwnProperty.call(b, 'constructor')) && - !(isFunction(ctorA) && ctorA instanceof ctorA && isFunction(ctorB) && ctorB instanceof ctorB) && - ('constructor' in a && 'constructor' in b) - ) { - return false; - } - } - // assume cyclic structures are equal - // the algorithm for detecting cyclic structures is adapted from ES 5.1 - // section 15.12.3, abstract operation `JO` (http://es5.github.io/#x15.12.3) - var initedStack = !stackA; - stackA || (stackA = []); - stackB || (stackB = []); - - var length = stackA.length; - while (length--) { - if (stackA[length] == a) { - return stackB[length] == b; - } + } else if (!(arrValue === othValue || equalFunc(arrValue, othValue, isLoose, stackA, stackB))) { + return false; } - var size = 0; - var result = true; + } + return true; +} - // add `a` and `b` to the stack of traversed objects - stackA.push(a); - stackB.push(b); +function baseIsEqualDeep(object, other, equalFunc, isLoose, stackA, stackB) { + var objIsArr = isArray(object), + othIsArr = isArray(other), + objTag = arrayTag, + othTag = arrayTag; - // recursively compare objects and arrays (susceptible to call stack limits) - if (isArr) { - // compare lengths to determine if a deep comparison is necessary - length = a.length; - size = b.length; - result = size == length; + if (!objIsArr) { + objTag = objToString.call(object); + if (objTag === argsTag) { + objTag = objectTag; + } else if (objTag !== objectTag) { + objIsArr = isTypedArray(object); + } + } + if (!othIsArr) { + othTag = objToString.call(other); + if (othTag === argsTag) { + othTag = objectTag; + } + } + var objIsObj = objTag === objectTag && !isHostObject(object), + othIsObj = othTag === objectTag && !isHostObject(other), + isSameTag = objTag === othTag; - if (result) { - // deep compare the contents, ignoring non-numeric properties - while (size--) { - var index = length, - value = b[size]; + if (isSameTag && !(objIsArr || objIsObj)) { + return equalByTag(object, other, objTag); + } + if (!isLoose) { + var objIsWrapped = objIsObj && hasOwnProperty.call(object, '__wrapped__'), + othIsWrapped = othIsObj && hasOwnProperty.call(other, '__wrapped__'); - if (!(result = deepEquals(a[size], value, stackA, stackB))) { - break; - } - } - } + if (objIsWrapped || othIsWrapped) { + return equalFunc(objIsWrapped ? object.value() : object, othIsWrapped ? other.value() : other, isLoose, stackA, stackB); } - else { - // deep compare objects using `forIn`, instead of `forOwn`, to avoid `Object.keys` - // which, in this case, is more costly - internalForIn(b, function(value, key, b) { - if (hasOwnProperty.call(b, key)) { - // count the number of properties. - size++; - // deep compare each property value. - return (result = hasOwnProperty.call(a, key) && deepEquals(a[key], value, stackA, stackB)); - } - }); + } + if (!isSameTag) { + return false; + } + // Assume cyclic values are equal. + // For more information on detecting circular references see https://es5.github.io/#JO. + stackA || (stackA = []); + stackB || (stackB = []); - if (result) { - // ensure both objects have the same number of properties - internalForIn(a, function(value, key, a) { - if (hasOwnProperty.call(a, key)) { - // `size` will be `-1` if `a` has more properties than `b` - return (result = --size > -1); - } - }); - } + var length = stackA.length; + while (length--) { + if (stackA[length] === object) { + return stackB[length] === other; } - stackA.pop(); - stackB.pop(); + } + // Add `object` and `other` to the stack of traversed objects. + stackA.push(object); + stackB.push(other); - return result; + var result = (objIsArr ? equalArrays : equalObjects)(object, other, equalFunc, isLoose, stackA, stackB); + + stackA.pop(); + stackB.pop(); + + return result; +} + +function baseIsEqual(value, other, isLoose, stackA, stackB) { + if (value === other) { + return true; } + if (value == null || other == null || (!isObject(value) && !isObjectLike(other))) { + return value !== value && other !== other; + } + return baseIsEqualDeep(value, other, baseIsEqual, isLoose, stackA, stackB); +} + +var isEqual = Rx.internals.isEqual = function (value, other) { + return baseIsEqual(value, other); +}; var hasProp = {}.hasOwnProperty, slice = Array.prototype.slice; @@ -562,7 +591,7 @@ // Rx Utils var addRef = Rx.internals.addRef = function (xs, r) { return new AnonymousObservable(function (observer) { - return new CompositeDisposable(r.getDisposable(), xs.subscribe(observer)); + return new BinaryDisposable(r.getDisposable(), xs.subscribe(observer)); }); }; @@ -582,15 +611,11 @@ var args = [], i, len; if (Array.isArray(arguments[0])) { args = arguments[0]; - len = args.length; } else { len = arguments.length; args = new Array(len); for(i = 0; i < len; i++) { args[i] = arguments[i]; } } - for(i = 0; i < len; i++) { - if (!isDisposable(args[i])) { throw new TypeError('Not a disposable'); } - } this.disposables = args; this.isDisposed = false; this.length = args.length; @@ -689,6 +714,10 @@ if (disposable.isDisposed) { throw new ObjectDisposedError(); } }; + var disposableFixup = Disposable._fixup = function (result) { + return isDisposable(result) ? result : disposableEmpty; + }; + // Single assignment var SingleAssignmentDisposable = Rx.SingleAssignmentDisposable = function () { this.isDisposed = false; @@ -708,8 +737,8 @@ this.isDisposed = true; var old = this.current; this.current = null; + old && old.dispose(); } - old && old.dispose(); }; // Multiple assignment disposable @@ -738,6 +767,39 @@ old && old.dispose(); }; + var BinaryDisposable = Rx.BinaryDisposable = function (first, second) { + this._first = first; + this._second = second; + this.isDisposed = false; + }; + + BinaryDisposable.prototype.dispose = function () { + if (!this.isDisposed) { + this.isDisposed = true; + var old1 = this._first; + this._first = null; + old1 && old1.dispose(); + var old2 = this._second; + this._second = null; + old2 && old2.dispose(); + } + }; + + var NAryDisposable = Rx.NAryDisposable = function (disposables) { + this._disposables = disposables; + this.isDisposed = false; + }; + + NAryDisposable.prototype.dispose = function () { + if (!this.isDisposed) { + this.isDisposed = true; + for (var i = 0, len = this._disposables.length; i < len; i++) { + this._disposables[i].dispose(); + } + this._disposables.length = 0; + } + }; + /** * Represents a disposable resource that only disposes its underlying disposable resource when all dependent disposable objects have been disposed. */ @@ -803,7 +865,7 @@ this.dueTime = dueTime; this.comparer = comparer || defaultSubComparer; this.disposable = new SingleAssignmentDisposable(); - } + }; ScheduledItem.prototype.invoke = function () { this.disposable.setDisposable(this.invokeCore()); @@ -818,95 +880,58 @@ }; ScheduledItem.prototype.invokeCore = function () { - return this.action(this.scheduler, this.state); + return disposableFixup(this.action(this.scheduler, this.state)); }; /** Provides a set of static properties to access commonly used schedulers. */ var Scheduler = Rx.Scheduler = (function () { - function Scheduler(now, schedule, scheduleRelative, scheduleAbsolute) { - this.now = now; - this._schedule = schedule; - this._scheduleRelative = scheduleRelative; - this._scheduleAbsolute = scheduleAbsolute; - } + function Scheduler() { } /** Determines whether the given object is a scheduler */ Scheduler.isScheduler = function (s) { return s instanceof Scheduler; - } - - function invokeAction(scheduler, action) { - action(); - return disposableEmpty; - } + }; var schedulerProto = Scheduler.prototype; /** - * Schedules an action to be executed. - * @param {Function} action Action to execute. - * @returns {Disposable} The disposable object used to cancel the scheduled action (best effort). - */ - schedulerProto.schedule = function (action) { - return this._schedule(action, invokeAction); - }; - - /** - * Schedules an action to be executed. - * @param state State passed to the action to be executed. - * @param {Function} action Action to be executed. - * @returns {Disposable} The disposable object used to cancel the scheduled action (best effort). - */ - schedulerProto.scheduleWithState = function (state, action) { - return this._schedule(state, action); + * Schedules an action to be executed. + * @param state State passed to the action to be executed. + * @param {Function} action Action to be executed. + * @returns {Disposable} The disposable object used to cancel the scheduled action (best effort). + */ + schedulerProto.schedule = function (state, action) { + throw new NotImplementedError(); }; - /** - * Schedules an action to be executed after the specified relative due time. - * @param {Function} action Action to execute. - * @param {Number} dueTime Relative time after which to execute the action. - * @returns {Disposable} The disposable object used to cancel the scheduled action (best effort). - */ - schedulerProto.scheduleWithRelative = function (dueTime, action) { - return this._scheduleRelative(action, dueTime, invokeAction); - }; + /** + * Schedules an action to be executed after dueTime. + * @param state State passed to the action to be executed. + * @param {Function} action Action to be executed. + * @param {Number} dueTime Relative time after which to execute the action. + * @returns {Disposable} The disposable object used to cancel the scheduled action (best effort). + */ + schedulerProto.scheduleFuture = function (state, dueTime, action) { + var dt = dueTime; + dt instanceof Date && (dt = dt - this.now()); + dt = Scheduler.normalize(dt); - /** - * Schedules an action to be executed after dueTime. - * @param state State passed to the action to be executed. - * @param {Function} action Action to be executed. - * @param {Number} dueTime Relative time after which to execute the action. - * @returns {Disposable} The disposable object used to cancel the scheduled action (best effort). - */ - schedulerProto.scheduleWithRelativeAndState = function (state, dueTime, action) { - return this._scheduleRelative(state, dueTime, action); - }; + if (dt === 0) { return this.schedule(state, action); } - /** - * Schedules an action to be executed at the specified absolute due time. - * @param {Function} action Action to execute. - * @param {Number} dueTime Absolute time at which to execute the action. - * @returns {Disposable} The disposable object used to cancel the scheduled action (best effort). - */ - schedulerProto.scheduleWithAbsolute = function (dueTime, action) { - return this._scheduleAbsolute(action, dueTime, invokeAction); + return this._scheduleFuture(state, dt, action); }; - /** - * Schedules an action to be executed at dueTime. - * @param {Mixed} state State passed to the action to be executed. - * @param {Function} action Action to be executed. - * @param {Number}dueTime Absolute time at which to execute the action. - * @returns {Disposable} The disposable object used to cancel the scheduled action (best effort). - */ - schedulerProto.scheduleWithAbsoluteAndState = function (state, dueTime, action) { - return this._scheduleAbsolute(state, dueTime, action); + schedulerProto._scheduleFuture = function (state, dueTime, action) { + throw new NotImplementedError(); }; /** Gets the current time according to the local machine's system clock. */ Scheduler.now = defaultNow; + /** Gets the current time according to the local machine's system clock. */ + Scheduler.prototype.now = defaultNow; + /** * Normalizes the specified TimeSpan value to a positive value. * @param {Number} timeSpan The time span value to normalize. @@ -932,7 +957,7 @@ function innerAction(state2) { var isAdded = false, isDone = false; - var d = scheduler.scheduleWithState(state2, scheduleWork); + var d = scheduler.schedule(state2, scheduleWork); if (!isDone) { group.add(d); isAdded = true; @@ -950,7 +975,7 @@ } } - function invokeRecDate(scheduler, pair, method) { + function invokeRecDate(scheduler, pair) { var state = pair[0], action = pair[1], group = new CompositeDisposable(); action(state, innerAction); return group; @@ -958,7 +983,7 @@ function innerAction(state2, dueTime1) { var isAdded = false, isDone = false; - var d = scheduler[method](state2, dueTime1, scheduleWork); + var d = scheduler.scheduleFuture(state2, dueTime1, scheduleWork); if (!isDone) { group.add(d); isAdded = true; @@ -976,100 +1001,39 @@ } } - function invokeRecDateRelative(s, p) { - return invokeRecDate(s, p, 'scheduleWithRelativeAndState'); - } - - function invokeRecDateAbsolute(s, p) { - return invokeRecDate(s, p, 'scheduleWithAbsoluteAndState'); - } - - function scheduleInnerRecursive(action, self) { - action(function(dt) { self(action, dt); }); - } - - /** - * Schedules an action to be executed recursively. - * @param {Function} action Action to execute recursively. The parameter passed to the action is used to trigger recursive scheduling of the action. - * @returns {Disposable} The disposable object used to cancel the scheduled action (best effort). - */ - schedulerProto.scheduleRecursive = function (action) { - return this.scheduleRecursiveWithState(action, scheduleInnerRecursive); - }; - /** * Schedules an action to be executed recursively. * @param {Mixed} state State passed to the action to be executed. * @param {Function} action Action to execute recursively. The last parameter passed to the action is used to trigger recursive scheduling of the action, passing in recursive invocation state. * @returns {Disposable} The disposable object used to cancel the scheduled action (best effort). */ - schedulerProto.scheduleRecursiveWithState = function (state, action) { - return this.scheduleWithState([state, action], invokeRecImmediate); - }; - - /** - * Schedules an action to be executed recursively after a specified relative due time. - * @param {Function} action Action to execute recursively. The parameter passed to the action is used to trigger recursive scheduling of the action at the specified relative time. - * @param {Number}dueTime Relative time after which to execute the action for the first time. - * @returns {Disposable} The disposable object used to cancel the scheduled action (best effort). - */ - schedulerProto.scheduleRecursiveWithRelative = function (dueTime, action) { - return this.scheduleRecursiveWithRelativeAndState(action, dueTime, scheduleInnerRecursive); + schedulerProto.scheduleRecursive = function (state, action) { + return this.schedule([state, action], invokeRecImmediate); }; /** - * Schedules an action to be executed recursively after a specified relative due time. + * Schedules an action to be executed recursively after a specified relative or absolute due time. * @param {Mixed} state State passed to the action to be executed. * @param {Function} action Action to execute recursively. The last parameter passed to the action is used to trigger recursive scheduling of the action, passing in the recursive due time and invocation state. - * @param {Number}dueTime Relative time after which to execute the action for the first time. + * @param {Number | Date} dueTime Relative or absolute time after which to execute the action for the first time. * @returns {Disposable} The disposable object used to cancel the scheduled action (best effort). */ - schedulerProto.scheduleRecursiveWithRelativeAndState = function (state, dueTime, action) { - return this._scheduleRelative([state, action], dueTime, invokeRecDateRelative); + schedulerProto.scheduleRecursiveFuture = function (state, dueTime, action) { + return this.scheduleFuture([state, action], dueTime, invokeRecDate); }; - /** - * Schedules an action to be executed recursively at a specified absolute due time. - * @param {Function} action Action to execute recursively. The parameter passed to the action is used to trigger recursive scheduling of the action at the specified absolute time. - * @param {Number}dueTime Absolute time at which to execute the action for the first time. - * @returns {Disposable} The disposable object used to cancel the scheduled action (best effort). - */ - schedulerProto.scheduleRecursiveWithAbsolute = function (dueTime, action) { - return this.scheduleRecursiveWithAbsoluteAndState(action, dueTime, scheduleInnerRecursive); - }; - - /** - * Schedules an action to be executed recursively at a specified absolute due time. - * @param {Mixed} state State passed to the action to be executed. - * @param {Function} action Action to execute recursively. The last parameter passed to the action is used to trigger recursive scheduling of the action, passing in the recursive due time and invocation state. - * @param {Number}dueTime Absolute time at which to execute the action for the first time. - * @returns {Disposable} The disposable object used to cancel the scheduled action (best effort). - */ - schedulerProto.scheduleRecursiveWithAbsoluteAndState = function (state, dueTime, action) { - return this._scheduleAbsolute([state, action], dueTime, invokeRecDateAbsolute); - }; }(Scheduler.prototype)); (function (schedulerProto) { /** * Schedules a periodic piece of work by dynamically discovering the scheduler's capabilities. The periodic task will be scheduled using window.setInterval for the base implementation. - * @param {Number} period Period for running the work periodically. - * @param {Function} action Action to be executed. - * @returns {Disposable} The disposable object used to cancel the scheduled recurring action (best effort). - */ - Scheduler.prototype.schedulePeriodic = function (period, action) { - return this.schedulePeriodicWithState(null, period, action); - }; - - /** - * Schedules a periodic piece of work by dynamically discovering the scheduler's capabilities. The periodic task will be scheduled using window.setInterval for the base implementation. * @param {Mixed} state Initial state passed to the action upon the first iteration. * @param {Number} period Period for running the work periodically. * @param {Function} action Action to be executed, potentially updating the state. * @returns {Disposable} The disposable object used to cancel the scheduled recurring action (best effort). */ - Scheduler.prototype.schedulePeriodicWithState = function(state, period, action) { + schedulerProto.schedulePeriodic = function(state, period, action) { if (typeof root.setInterval === 'undefined') { throw new NotSupportedError(); } period = normalizeTime(period); var s = state, id = root.setInterval(function () { s = action(s); }, period); @@ -1079,54 +1043,73 @@ }(Scheduler.prototype)); /** Gets a scheduler that schedules work immediately on the current thread. */ - var immediateScheduler = Scheduler.immediate = (function () { - function scheduleNow(state, action) { return action(this, state); } - return new Scheduler(defaultNow, scheduleNow, notSupported, notSupported); - }()); + var ImmediateScheduler = (function (__super__) { + inherits(ImmediateScheduler, __super__); + function ImmediateScheduler() { + __super__.call(this); + } + + ImmediateScheduler.prototype.schedule = function (state, action) { + return disposableFixup(action(this, state)); + }; + + return ImmediateScheduler; + }(Scheduler)); + + var immediateScheduler = Scheduler.immediate = new ImmediateScheduler(); /** * Gets a scheduler that schedules work as soon as possible on the current thread. */ - var currentThreadScheduler = Scheduler.currentThread = (function () { + var CurrentThreadScheduler = (function (__super__) { var queue; function runTrampoline () { while (queue.length > 0) { - var item = queue.shift(); + var item = queue.dequeue(); !item.isCancelled() && item.invoke(); } } - function scheduleNow(state, action) { + inherits(CurrentThreadScheduler, __super__); + function CurrentThreadScheduler() { + __super__.call(this); + } + + CurrentThreadScheduler.prototype.schedule = function (state, action) { var si = new ScheduledItem(this, state, action, this.now()); if (!queue) { - queue = [si]; + queue = new PriorityQueue(4); + queue.enqueue(si); var result = tryCatch(runTrampoline)(); queue = null; - if (result === errorObj) { return thrower(result.e); } + if (result === errorObj) { thrower(result.e); } } else { - queue.push(si); + queue.enqueue(si); } return si.disposable; - } + }; - var currentScheduler = new Scheduler(defaultNow, scheduleNow, notSupported, notSupported); - currentScheduler.scheduleRequired = function () { return !queue; }; + CurrentThreadScheduler.prototype.scheduleRequired = function () { return !queue; }; - return currentScheduler; - }()); + return CurrentThreadScheduler; + }(Scheduler)); + + var currentThreadScheduler = Scheduler.currentThread = new CurrentThreadScheduler(); var SchedulePeriodicRecursive = Rx.internals.SchedulePeriodicRecursive = (function () { - function tick(command, recurse) { - recurse(0, this._period); - try { - this._state = this._action(this._state); - } catch (e) { - this._cancel.dispose(); - throw e; - } + function createTick(self) { + return function tick(command, recurse) { + recurse(0, self._period); + var state = tryCatch(self._action)(self._state); + if (state === errorObj) { + self._cancel.dispose(); + thrower(state.e); + } + self._state = state; + }; } function SchedulePeriodicRecursive(scheduler, state, period, action) { @@ -1139,7 +1122,7 @@ SchedulePeriodicRecursive.prototype.start = function () { var d = new SingleAssignmentDisposable(); this._cancel = d; - d.setDisposable(this._scheduler.scheduleRecursiveWithRelativeAndState(0, this._period, tick.bind(this))); + d.setDisposable(this._scheduler.scheduleRecursiveFuture(0, this._period, createTick(this))); return d; }; @@ -1181,7 +1164,7 @@ function runTask(handle) { if (currentlyRunning) { - localSetTimeout(function () { runTask(handle) }, 0); + localSetTimeout(function () { runTask(handle); }, 0); } else { var task = tasksByHandle[handle]; if (task) { @@ -1189,12 +1172,12 @@ var result = tryCatch(task)(); clearMethod(handle); currentlyRunning = false; - if (result === errorObj) { return thrower(result.e); } + if (result === errorObj) { thrower(result.e); } } } } - var reNative = RegExp('^' + + var reNative = new RegExp('^' + String(toString) .replace(/[.*+?^${}()|[\]\\]/g, '\\$&') .replace(/toString| for [^\]]+/g, '.*?') + '$' @@ -1235,25 +1218,19 @@ } else if (postMessageSupported()) { var MSG_PREFIX = 'ms.rx.schedule' + Math.random(); - function onGlobalPostMessage(event) { + var onGlobalPostMessage = function (event) { // Only if we're a match to avoid any other global events if (typeof event.data === 'string' && event.data.substring(0, MSG_PREFIX.length) === MSG_PREFIX) { runTask(event.data.substring(MSG_PREFIX.length)); } - } + }; - if (root.addEventListener) { - root.addEventListener('message', onGlobalPostMessage, false); - } else if (root.attachEvent) { - root.attachEvent('onmessage', onGlobalPostMessage); - } else { - root.onmessage = onGlobalPostMessage; - } + root.addEventListener('message', onGlobalPostMessage, false); scheduleMethod = function (action) { var id = nextHandle++; tasksByHandle[id] = action; - root.postMessage(MSG_PREFIX + currentId, '*'); + root.postMessage(MSG_PREFIX + id, '*'); return id; }; } else if (!!root.MessageChannel) { @@ -1300,61 +1277,170 @@ /** * Gets a scheduler that schedules work via a timed callback based upon platform. */ - var timeoutScheduler = Scheduler.timeout = Scheduler['default'] = (function () { + var DefaultScheduler = (function (__super__) { + inherits(DefaultScheduler, __super__); + function DefaultScheduler() { + __super__.call(this); + } + + function scheduleAction(disposable, action, scheduler, state) { + return function schedule() { + disposable.setDisposable(Disposable._fixup(action(scheduler, state))); + }; + } + + function ClearDisposable(id) { + this._id = id; + this.isDisposed = false; + } + + ClearDisposable.prototype.dispose = function () { + if (!this.isDisposed) { + this.isDisposed = true; + clearMethod(this._id); + } + }; + + function LocalClearDisposable(id) { + this._id = id; + this.isDisposed = false; + } + + LocalClearDisposable.prototype.dispose = function () { + if (!this.isDisposed) { + this.isDisposed = true; + localClearTimeout(this._id); + } + }; + + DefaultScheduler.prototype.schedule = function (state, action) { + var disposable = new SingleAssignmentDisposable(), + id = scheduleMethod(scheduleAction(disposable, action, this, state)); + return new BinaryDisposable(disposable, new ClearDisposable(id)); + }; + + DefaultScheduler.prototype._scheduleFuture = function (state, dueTime, action) { + if (dueTime === 0) { return this.schedule(state, action); } + var disposable = new SingleAssignmentDisposable(), + id = localSetTimeout(scheduleAction(disposable, action, this, state), dueTime); + return new BinaryDisposable(disposable, new LocalClearDisposable(id)); + }; + + return DefaultScheduler; + }(Scheduler)); + + var defaultScheduler = Scheduler['default'] = Scheduler.async = new DefaultScheduler(); + + function IndexedItem(id, value) { + this.id = id; + this.value = value; + } - function scheduleNow(state, action) { - var scheduler = this, disposable = new SingleAssignmentDisposable(); - var id = scheduleMethod(function () { - !disposable.isDisposed && disposable.setDisposable(action(scheduler, state)); - }); - return new CompositeDisposable(disposable, disposableCreate(function () { - clearMethod(id); - })); - } + IndexedItem.prototype.compareTo = function (other) { + var c = this.value.compareTo(other.value); + c === 0 && (c = this.id - other.id); + return c; + }; - function scheduleRelative(state, dueTime, action) { - var scheduler = this, dt = Scheduler.normalize(dueTime), disposable = new SingleAssignmentDisposable(); - if (dt === 0) { return scheduler.scheduleWithState(state, action); } - var id = localSetTimeout(function () { - !disposable.isDisposed && disposable.setDisposable(action(scheduler, state)); - }, dt); - return new CompositeDisposable(disposable, disposableCreate(function () { - localClearTimeout(id); - })); + var PriorityQueue = Rx.internals.PriorityQueue = function (capacity) { + this.items = new Array(capacity); + this.length = 0; + }; + + var priorityProto = PriorityQueue.prototype; + priorityProto.isHigherPriority = function (left, right) { + return this.items[left].compareTo(this.items[right]) < 0; + }; + + priorityProto.percolate = function (index) { + if (index >= this.length || index < 0) { return; } + var parent = index - 1 >> 1; + if (parent < 0 || parent === index) { return; } + if (this.isHigherPriority(index, parent)) { + var temp = this.items[index]; + this.items[index] = this.items[parent]; + this.items[parent] = temp; + this.percolate(parent); } + }; - function scheduleAbsolute(state, dueTime, action) { - return this.scheduleWithRelativeAndState(state, dueTime - this.now(), action); + priorityProto.heapify = function (index) { + +index || (index = 0); + if (index >= this.length || index < 0) { return; } + var left = 2 * index + 1, + right = 2 * index + 2, + first = index; + if (left < this.length && this.isHigherPriority(left, first)) { + first = left; } + if (right < this.length && this.isHigherPriority(right, first)) { + first = right; + } + if (first !== index) { + var temp = this.items[index]; + this.items[index] = this.items[first]; + this.items[first] = temp; + this.heapify(first); + } + }; - return new Scheduler(defaultNow, scheduleNow, scheduleRelative, scheduleAbsolute); - })(); + priorityProto.peek = function () { return this.items[0].value; }; + + priorityProto.removeAt = function (index) { + this.items[index] = this.items[--this.length]; + this.items[this.length] = undefined; + this.heapify(); + }; + + priorityProto.dequeue = function () { + var result = this.peek(); + this.removeAt(0); + return result; + }; + + priorityProto.enqueue = function (item) { + var index = this.length++; + this.items[index] = new IndexedItem(PriorityQueue.count++, item); + this.percolate(index); + }; + + priorityProto.remove = function (item) { + for (var i = 0; i < this.length; i++) { + if (this.items[i].value === item) { + this.removeAt(i); + return true; + } + } + return false; + }; + PriorityQueue.count = 0; /** * Represents a notification to an observer. */ var Notification = Rx.Notification = (function () { - function Notification(kind, value, exception, accept, acceptObservable, toString) { - this.kind = kind; - this.value = value; - this.exception = exception; - this._accept = accept; - this._acceptObservable = acceptObservable; - this.toString = toString; + function Notification() { + } + Notification.prototype._accept = function (onNext, onError, onCompleted) { + throw new NotImplementedError(); + }; + + Notification.prototype._acceptObserver = function (onNext, onError, onCompleted) { + throw new NotImplementedError(); + }; + /** * Invokes the delegate corresponding to the notification or the observer's method corresponding to the notification and returns the produced result. - * - * @memberOf Notification - * @param {Any} observerOrOnNext Delegate to invoke for an OnNext notification or Observer to invoke the notification on.. - * @param {Function} onError Delegate to invoke for an OnError notification. - * @param {Function} onCompleted Delegate to invoke for an OnCompleted notification. + * @param {Function | Observer} observerOrOnNext Function to invoke for an OnNext notification or Observer to invoke the notification on.. + * @param {Function} onError Function to invoke for an OnError notification. + * @param {Function} onCompleted Function to invoke for an OnCompleted notification. * @returns {Any} Result produced by the observation. */ Notification.prototype.accept = function (observerOrOnNext, onError, onCompleted) { return observerOrOnNext && typeof observerOrOnNext === 'object' ? - this._acceptObservable(observerOrOnNext) : + this._acceptObserver(observerOrOnNext) : this._accept(observerOrOnNext, onError, onCompleted); }; @@ -1368,10 +1454,10 @@ Notification.prototype.toObservable = function (scheduler) { var self = this; isScheduler(scheduler) || (scheduler = immediateScheduler); - return new AnonymousObservable(function (observer) { - return scheduler.scheduleWithState(self, function (_, notification) { - notification._acceptObservable(observer); - notification.kind === 'N' && observer.onCompleted(); + return new AnonymousObservable(function (o) { + return scheduler.schedule(self, function (_, notification) { + notification._acceptObserver(o); + notification.kind === 'N' && o.onCompleted(); }); }); }; @@ -1379,49 +1465,96 @@ return Notification; })(); + var OnNextNotification = (function (__super__) { + inherits(OnNextNotification, __super__); + function OnNextNotification(value) { + this.value = value; + this.kind = 'N'; + } + + OnNextNotification.prototype._accept = function (onNext) { + return onNext(this.value); + }; + + OnNextNotification.prototype._acceptObserver = function (o) { + return o.onNext(this.value); + }; + + OnNextNotification.prototype.toString = function () { + return 'OnNext(' + this.value + ')'; + }; + + return OnNextNotification; + }(Notification)); + + var OnErrorNotification = (function (__super__) { + inherits(OnErrorNotification, __super__); + function OnErrorNotification(error) { + this.error = error; + this.kind = 'E'; + } + + OnErrorNotification.prototype._accept = function (onNext, onError) { + return onError(this.error); + }; + + OnErrorNotification.prototype._acceptObserver = function (o) { + return o.onError(this.error); + }; + + OnErrorNotification.prototype.toString = function () { + return 'OnError(' + this.error + ')'; + }; + + return OnErrorNotification; + }(Notification)); + + var OnCompletedNotification = (function (__super__) { + inherits(OnCompletedNotification, __super__); + function OnCompletedNotification() { + this.kind = 'C'; + } + + OnCompletedNotification.prototype._accept = function (onNext, onError, onCompleted) { + return onCompleted(); + }; + + OnCompletedNotification.prototype._acceptObserver = function (o) { + return o.onCompleted(); + }; + + OnCompletedNotification.prototype.toString = function () { + return 'OnCompleted()'; + }; + + return OnCompletedNotification; + }(Notification)); + /** * Creates an object that represents an OnNext notification to an observer. * @param {Any} value The value contained in the notification. * @returns {Notification} The OnNext notification containing the value. */ - var notificationCreateOnNext = Notification.createOnNext = (function () { - function _accept(onNext) { return onNext(this.value); } - function _acceptObservable(observer) { return observer.onNext(this.value); } - function toString() { return 'OnNext(' + this.value + ')'; } - - return function (value) { - return new Notification('N', value, null, _accept, _acceptObservable, toString); - }; - }()); + var notificationCreateOnNext = Notification.createOnNext = function (value) { + return new OnNextNotification(value); + }; /** * Creates an object that represents an OnError notification to an observer. * @param {Any} error The exception contained in the notification. * @returns {Notification} The OnError notification containing the exception. */ - var notificationCreateOnError = Notification.createOnError = (function () { - function _accept (onNext, onError) { return onError(this.exception); } - function _acceptObservable(observer) { return observer.onError(this.exception); } - function toString () { return 'OnError(' + this.exception + ')'; } - - return function (e) { - return new Notification('E', null, e, _accept, _acceptObservable, toString); - }; - }()); + var notificationCreateOnError = Notification.createOnError = function (error) { + return new OnErrorNotification(error); + }; /** * Creates an object that represents an OnCompleted notification to an observer. * @returns {Notification} The OnCompleted notification. */ - var notificationCreateOnCompleted = Notification.createOnCompleted = (function () { - function _accept (onNext, onError, onCompleted) { return onCompleted(); } - function _acceptObservable(observer) { return observer.onCompleted(); } - function toString () { return 'OnCompleted()'; } - - return function () { - return new Notification('C', null, null, _accept, _acceptObservable, toString); - }; - }()); + var notificationCreateOnCompleted = Notification.createOnCompleted = function () { + return new OnCompletedNotification(); + }; /** * Supports push-style iteration over an observable sequence. @@ -1572,13 +1705,12 @@ }; } - function Observable(subscribe) { + function Observable() { if (Rx.config.longStackSupport && hasStacks) { + var oldSubscribe = this._subscribe; var e = tryCatch(thrower)(new Error()).e; this.stack = e.stack.substring(e.stack.indexOf('\n') + 1); - this._subscribe = makeSubscribe(this, subscribe); - } else { - this._subscribe = subscribe; + this._subscribe = makeSubscribe(this, oldSubscribe); } } @@ -1591,7 +1723,7 @@ */ Observable.isObservable = function (o) { return o && isFunction(o.subscribe); - } + }; /** * Subscribes an o to the observable sequence. @@ -1652,45 +1784,48 @@ this.disposable = new SerialDisposable(); } - ScheduledObserver.prototype.next = function (value) { - var self = this; - this.queue.push(function () { self.observer.onNext(value); }); + function enqueueNext(observer, x) { return function () { observer.onNext(x); }; } + function enqueueError(observer, e) { return function () { observer.onError(e); }; } + function enqueueCompleted(observer) { return function () { observer.onCompleted(); }; } + + ScheduledObserver.prototype.next = function (x) { + this.queue.push(enqueueNext(this.observer, x)); }; ScheduledObserver.prototype.error = function (e) { - var self = this; - this.queue.push(function () { self.observer.onError(e); }); + this.queue.push(enqueueError(this.observer, e)); }; ScheduledObserver.prototype.completed = function () { - var self = this; - this.queue.push(function () { self.observer.onCompleted(); }); + this.queue.push(enqueueCompleted(this.observer)); }; + + function scheduleMethod(state, recurse) { + var work; + if (state.queue.length > 0) { + work = state.queue.shift(); + } else { + state.isAcquired = false; + return; + } + var res = tryCatch(work)(); + if (res === errorObj) { + state.queue = []; + state.hasFaulted = true; + return thrower(res.e); + } + recurse(state); + } + ScheduledObserver.prototype.ensureActive = function () { var isOwner = false; if (!this.hasFaulted && this.queue.length > 0) { isOwner = !this.isAcquired; this.isAcquired = true; } - if (isOwner) { - this.disposable.setDisposable(this.scheduler.scheduleRecursiveWithState(this, function (parent, self) { - var work; - if (parent.queue.length > 0) { - work = parent.queue.shift(); - } else { - parent.isAcquired = false; - return; - } - var res = tryCatch(work)(); - if (res === errorObj) { - parent.queue = []; - parent.hasFaulted = true; - return thrower(res.e); - } - self(parent); - })); - } + isOwner && + this.disposable.setDisposable(this.scheduler.scheduleRecursive(this, scheduleMethod)); }; ScheduledObserver.prototype.dispose = function () { @@ -1712,92 +1847,74 @@ function setDisposable(s, state) { var ado = state[0], self = state[1]; var sub = tryCatch(self.subscribeCore).call(self, ado); - - if (sub === errorObj) { - if(!ado.fail(errorObj.e)) { return thrower(errorObj.e); } - } + if (sub === errorObj && !ado.fail(errorObj.e)) { thrower(errorObj.e); } ado.setDisposable(fixSubscriber(sub)); } - function subscribe(observer) { - var ado = new AutoDetachObserver(observer), state = [ado, this]; + function ObservableBase() { + __super__.call(this); + } + + ObservableBase.prototype._subscribe = function (o) { + var ado = new AutoDetachObserver(o), state = [ado, this]; if (currentThreadScheduler.scheduleRequired()) { - currentThreadScheduler.scheduleWithState(state, setDisposable); + currentThreadScheduler.schedule(state, setDisposable); } else { setDisposable(null, state); } return ado; - } - - function ObservableBase() { - __super__.call(this, subscribe); - } + }; ObservableBase.prototype.subscribeCore = notImplemented; return ObservableBase; }(Observable)); -var FlatMapObservable = (function(__super__){ +var FlatMapObservable = Rx.FlatMapObservable = (function(__super__) { inherits(FlatMapObservable, __super__); function FlatMapObservable(source, selector, resultSelector, thisArg) { - this.resultSelector = Rx.helpers.isFunction(resultSelector) ? - resultSelector : null; - - this.selector = Rx.internals.bindCallback(Rx.helpers.isFunction(selector) ? selector : function() { return selector; }, thisArg, 3); - this.source = source; - - __super__.call(this); - + this.resultSelector = isFunction(resultSelector) ? resultSelector : null; + this.selector = bindCallback(isFunction(selector) ? selector : function() { return selector; }, thisArg, 3); + this.source = source; + __super__.call(this); } FlatMapObservable.prototype.subscribeCore = function(o) { - return this.source.subscribe(new InnerObserver(o, this.selector, this.resultSelector, this)); + return this.source.subscribe(new InnerObserver(o, this.selector, this.resultSelector, this)); }; + inherits(InnerObserver, AbstractObserver); function InnerObserver(observer, selector, resultSelector, source) { - this.i = 0; - this.selector = selector; - this.resultSelector = resultSelector; - this.source = source; - this.isStopped = false; - this.o = observer; + this.i = 0; + this.selector = selector; + this.resultSelector = resultSelector; + this.source = source; + this.o = observer; + AbstractObserver.call(this); } InnerObserver.prototype._wrapResult = function(result, x, i) { - return this.resultSelector ? - result.map(function(y, i2) { return this.resultSelector(x, y, i, i2); }, this) : - result; + return this.resultSelector ? + result.map(function(y, i2) { return this.resultSelector(x, y, i, i2); }, this) : + result; }; - InnerObserver.prototype.onNext = function(x) { - - if (this.isStopped) return; - - var i = this.i++; - var result = tryCatch(this.selector)(x, i, this.source); - - if (result === errorObj) { - return this.o.onError(result.e); - } - - Rx.helpers.isPromise(result) && (result = Rx.Observable.fromPromise(result)); - (Rx.helpers.isArrayLike(result) || Rx.helpers.isIterable(result)) && (result = Rx.Observable.from(result)); - - this.o.onNext(this._wrapResult(result, x, i)); + InnerObserver.prototype.next = function(x) { + var i = this.i++; + var result = tryCatch(this.selector)(x, i, this.source); + if (result === errorObj) { return this.o.onError(result.e); } + isPromise(result) && (result = observableFromPromise(result)); + (isArrayLike(result) || isIterable(result)) && (result = Observable.from(result)); + this.o.onNext(this._wrapResult(result, x, i)); }; - InnerObserver.prototype.onError = function(e) { - if(!this.isStopped) { this.isStopped = true; this.o.onError(e); } - }; + InnerObserver.prototype.error = function(e) { this.o.onError(e); }; - InnerObserver.prototype.onCompleted = function() { - if (!this.isStopped) {this.isStopped = true; this.o.onCompleted(); } - }; + InnerObserver.prototype.completed = function() { this.o.onCompleted(); }; return FlatMapObservable; @@ -1805,199 +1922,156 @@ var FlatMapObservable = (function(__super__){ var Enumerable = Rx.internals.Enumerable = function () { }; + function IsDisposedDisposable(state) { + this._s = state; + this.isDisposed = false; + } + + IsDisposedDisposable.prototype.dispose = function () { + if (!this.isDisposed) { + this.isDisposed = true; + this._s.isDisposed = true; + } + }; + var ConcatEnumerableObservable = (function(__super__) { inherits(ConcatEnumerableObservable, __super__); function ConcatEnumerableObservable(sources) { this.sources = sources; __super__.call(this); } - - ConcatEnumerableObservable.prototype.subscribeCore = function (o) { - var isDisposed, subscription = new SerialDisposable(); - var cancelable = immediateScheduler.scheduleRecursiveWithState(this.sources[$iterator$](), function (e, self) { - if (isDisposed) { return; } - var currentItem = tryCatch(e.next).call(e); - if (currentItem === errorObj) { return o.onError(currentItem.e); } - if (currentItem.done) { - return o.onCompleted(); - } + function scheduleMethod(state, recurse) { + if (state.isDisposed) { return; } + var currentItem = tryCatch(state.e.next).call(state.e); + if (currentItem === errorObj) { return state.o.onError(currentItem.e); } + if (currentItem.done) { return state.o.onCompleted(); } - // Check if promise - var currentValue = currentItem.value; - isPromise(currentValue) && (currentValue = observableFromPromise(currentValue)); + // Check if promise + var currentValue = currentItem.value; + isPromise(currentValue) && (currentValue = observableFromPromise(currentValue)); - var d = new SingleAssignmentDisposable(); - subscription.setDisposable(d); - d.setDisposable(currentValue.subscribe(new InnerObserver(o, self, e))); - }); + var d = new SingleAssignmentDisposable(); + state.subscription.setDisposable(d); + d.setDisposable(currentValue.subscribe(new InnerObserver(state, recurse))); + } - return new CompositeDisposable(subscription, cancelable, disposableCreate(function () { - isDisposed = true; - })); + ConcatEnumerableObservable.prototype.subscribeCore = function (o) { + var subscription = new SerialDisposable(); + var state = { + isDisposed: false, + o: o, + subscription: subscription, + e: this.sources[$iterator$]() + }; + + var cancelable = currentThreadScheduler.scheduleRecursive(state, scheduleMethod); + return new NAryDisposable([subscription, cancelable, new IsDisposedDisposable(state)]); }; - - function InnerObserver(o, s, e) { - this.o = o; - this.s = s; - this.e = e; - this.isStopped = false; + + function InnerObserver(state, recurse) { + this._state = state; + this._recurse = recurse; + AbstractObserver.call(this); } - InnerObserver.prototype.onNext = function (x) { if(!this.isStopped) { this.o.onNext(x); } }; - InnerObserver.prototype.onError = function (err) { - if (!this.isStopped) { - this.isStopped = true; - this.o.onError(err); - } - }; - InnerObserver.prototype.onCompleted = function () { - if (!this.isStopped) { - this.isStopped = true; - this.s(this.e); - } - }; - InnerObserver.prototype.dispose = function () { this.isStopped = true; }; - InnerObserver.prototype.fail = function (err) { - if (!this.isStopped) { - this.isStopped = true; - this.o.onError(err); - return true; - } - return false; - }; - + + inherits(InnerObserver, AbstractObserver); + + InnerObserver.prototype.next = function (x) { this._state.o.onNext(x); }; + InnerObserver.prototype.error = function (e) { this._state.o.onError(e); }; + InnerObserver.prototype.completed = function () { this._recurse(this._state); }; + return ConcatEnumerableObservable; }(ObservableBase)); Enumerable.prototype.concat = function () { return new ConcatEnumerableObservable(this); }; - + var CatchErrorObservable = (function(__super__) { - inherits(CatchErrorObservable, __super__); function CatchErrorObservable(sources) { this.sources = sources; __super__.call(this); } - - CatchErrorObservable.prototype.subscribeCore = function (o) { - var e = this.sources[$iterator$](); - var isDisposed, subscription = new SerialDisposable(); - var cancelable = immediateScheduler.scheduleRecursiveWithState(null, function (lastException, self) { - if (isDisposed) { return; } - var currentItem = tryCatch(e.next).call(e); - if (currentItem === errorObj) { return o.onError(currentItem.e); } + inherits(CatchErrorObservable, __super__); - if (currentItem.done) { - return lastException !== null ? o.onError(lastException) : o.onCompleted(); - } + function scheduleMethod(state, recurse) { + if (state.isDisposed) { return; } + var currentItem = tryCatch(state.e.next).call(state.e); + if (currentItem === errorObj) { return state.o.onError(currentItem.e); } + if (currentItem.done) { return state.lastError !== null ? state.o.onError(state.lastError) : state.o.onCompleted(); } - // Check if promise - var currentValue = currentItem.value; - isPromise(currentValue) && (currentValue = observableFromPromise(currentValue)); + var currentValue = currentItem.value; + isPromise(currentValue) && (currentValue = observableFromPromise(currentValue)); - var d = new SingleAssignmentDisposable(); - subscription.setDisposable(d); - d.setDisposable(currentValue.subscribe( - function(x) { o.onNext(x); }, - self, - function() { o.onCompleted(); })); - }); - return new CompositeDisposable(subscription, cancelable, disposableCreate(function () { - isDisposed = true; - })); - }; - - return CatchErrorObservable; - }(ObservableBase)); - - Enumerable.prototype.catchError = function () { - return new CatchErrorObservable(this); - }; - - Enumerable.prototype.catchErrorWhen = function (notificationHandler) { - var sources = this; - return new AnonymousObservable(function (o) { - var exceptions = new Subject(), - notifier = new Subject(), - handled = notificationHandler(exceptions), - notificationDisposable = handled.subscribe(notifier); + var d = new SingleAssignmentDisposable(); + state.subscription.setDisposable(d); + d.setDisposable(currentValue.subscribe(new InnerObserver(state, recurse))); + } - var e = sources[$iterator$](); + CatchErrorObservable.prototype.subscribeCore = function (o) { + var subscription = new SerialDisposable(); + var state = { + isDisposed: false, + e: this.sources[$iterator$](), + subscription: subscription, + lastError: null, + o: o + }; - var isDisposed, - lastException, - subscription = new SerialDisposable(); - var cancelable = immediateScheduler.scheduleRecursive(function (self) { - if (isDisposed) { return; } - var currentItem = tryCatch(e.next).call(e); - if (currentItem === errorObj) { return o.onError(currentItem.e); } + var cancelable = currentThreadScheduler.scheduleRecursive(state, scheduleMethod); + return new NAryDisposable([subscription, cancelable, new IsDisposedDisposable(state)]); + }; - if (currentItem.done) { - if (lastException) { - o.onError(lastException); - } else { - o.onCompleted(); - } - return; - } + function InnerObserver(state, recurse) { + this._state = state; + this._recurse = recurse; + AbstractObserver.call(this); + } - // Check if promise - var currentValue = currentItem.value; - isPromise(currentValue) && (currentValue = observableFromPromise(currentValue)); + inherits(InnerObserver, AbstractObserver); - var outer = new SingleAssignmentDisposable(); - var inner = new SingleAssignmentDisposable(); - subscription.setDisposable(new CompositeDisposable(inner, outer)); - outer.setDisposable(currentValue.subscribe( - function(x) { o.onNext(x); }, - function (exn) { - inner.setDisposable(notifier.subscribe(self, function(ex) { - o.onError(ex); - }, function() { - o.onCompleted(); - })); + InnerObserver.prototype.next = function (x) { this._state.o.onNext(x); }; + InnerObserver.prototype.error = function (e) { this._state.lastError = e; this._recurse(this._state); }; + InnerObserver.prototype.completed = function () { this._state.o.onCompleted(); }; - exceptions.onNext(exn); - }, - function() { o.onCompleted(); })); - }); + return CatchErrorObservable; + }(ObservableBase)); - return new CompositeDisposable(notificationDisposable, subscription, cancelable, disposableCreate(function () { - isDisposed = true; - })); - }); + Enumerable.prototype.catchError = function () { + return new CatchErrorObservable(this); }; - + var RepeatEnumerable = (function (__super__) { inherits(RepeatEnumerable, __super__); - function RepeatEnumerable(v, c) { this.v = v; this.c = c == null ? -1 : c; } + RepeatEnumerable.prototype[$iterator$] = function () { - return new RepeatEnumerator(this); + return new RepeatEnumerator(this); }; - + function RepeatEnumerator(p) { this.v = p.v; this.l = p.c; } + RepeatEnumerator.prototype.next = function () { if (this.l === 0) { return doneEnumerator; } if (this.l > 0) { this.l--; } - return { done: false, value: this.v }; + return { done: false, value: this.v }; }; - + return RepeatEnumerable; }(Enumerable)); var enumerableRepeat = Enumerable.repeat = function (value, repeatCount) { return new RepeatEnumerable(value, repeatCount); }; - + var OfEnumerable = (function(__super__) { inherits(OfEnumerable, __super__); function OfEnumerable(s, fn, thisArg) { @@ -2007,19 +2081,20 @@ var FlatMapObservable = (function(__super__){ OfEnumerable.prototype[$iterator$] = function () { return new OfEnumerator(this); }; - + function OfEnumerator(p) { this.i = -1; this.s = p.s; this.l = this.s.length; this.fn = p.fn; } + OfEnumerator.prototype.next = function () { return ++this.i < this.l ? { done: false, value: !this.fn ? this.s[this.i] : this.fn(this.s[this.i], this.i, this.s) } : - doneEnumerator; + doneEnumerator; }; - + return OfEnumerable; }(Enumerable)); @@ -2038,35 +2113,16 @@ var FlatMapObservable = (function(__super__){ return this.source.subscribe(new InnerObserver(o)); }; + inherits(InnerObserver, AbstractObserver); function InnerObserver(o) { this.o = o; this.a = []; - this.isStopped = false; + AbstractObserver.call(this); } - InnerObserver.prototype.onNext = function (x) { if(!this.isStopped) { this.a.push(x); } }; - InnerObserver.prototype.onError = function (e) { - if (!this.isStopped) { - this.isStopped = true; - this.o.onError(e); - } - }; - InnerObserver.prototype.onCompleted = function () { - if (!this.isStopped) { - this.isStopped = true; - this.o.onNext(this.a); - this.o.onCompleted(); - } - }; - InnerObserver.prototype.dispose = function () { this.isStopped = true; } - InnerObserver.prototype.fail = function (e) { - if (!this.isStopped) { - this.isStopped = true; - this.o.onError(e); - return true; - } - - return false; - }; + + InnerObserver.prototype.next = function (x) { this.a.push(x); }; + InnerObserver.prototype.error = function (e) { this.o.onError(e); }; + InnerObserver.prototype.completed = function () { this.o.onNext(this.a); this.o.onCompleted(); }; return ToArrayObservable; }(ObservableBase)); @@ -2092,6 +2148,23 @@ var FlatMapObservable = (function(__super__){ return new AnonymousObservable(subscribe, parent); }; + var Defer = (function(__super__) { + inherits(Defer, __super__); + function Defer(factory) { + this._f = factory; + __super__.call(this); + } + + Defer.prototype.subscribeCore = function (o) { + var result = tryCatch(this._f)(); + if (result === errorObj) { return observableThrow(result.e).subscribe(o);} + isPromise(result) && (result = observableFromPromise(result)); + return result.subscribe(o); + }; + + return Defer; + }(ObservableBase)); + /** * Returns an observable sequence that invokes the specified factory function whenever a new observer subscribes. * @@ -2101,16 +2174,7 @@ var FlatMapObservable = (function(__super__){ * @returns {Observable} An observable sequence whose observers trigger an invocation of the given observable factory function. */ var observableDefer = Observable.defer = function (observableFactory) { - return new AnonymousObservable(function (observer) { - var result; - try { - result = observableFactory(); - } catch (e) { - return observableThrow(e).subscribe(observer); - } - isPromise(result) && (result = observableFromPromise(result)); - return result.subscribe(observer); - }); + return new Defer(observableFactory); }; var EmptyObservable = (function(__super__) { @@ -2136,7 +2200,10 @@ var FlatMapObservable = (function(__super__){ } EmptySink.prototype.run = function () { - return this.scheduler.scheduleWithState(this.observer, scheduleItem); + var state = this.observer; + return this.scheduler === immediateScheduler ? + scheduleItem(null, state) : + this.scheduler.schedule(state, scheduleItem); }; return EmptyObservable; @@ -2160,54 +2227,40 @@ var FlatMapObservable = (function(__super__){ var FromObservable = (function(__super__) { inherits(FromObservable, __super__); - function FromObservable(iterable, mapper, scheduler) { - this.iterable = iterable; - this.mapper = mapper; - this.scheduler = scheduler; + function FromObservable(iterable, fn, scheduler) { + this._iterable = iterable; + this._fn = fn; + this._scheduler = scheduler; __super__.call(this); } - FromObservable.prototype.subscribeCore = function (o) { - var sink = new FromSink(o, this); - return sink.run(); - }; - - return FromObservable; - }(ObservableBase)); - - var FromSink = (function () { - function FromSink(o, parent) { - this.o = o; - this.parent = parent; - } - - FromSink.prototype.run = function () { - var list = Object(this.parent.iterable), - it = getIterable(list), - o = this.o, - mapper = this.parent.mapper; - - function loopRecursive(i, recurse) { + function createScheduleMethod(o, it, fn) { + return function loopRecursive(i, recurse) { var next = tryCatch(it.next).call(it); if (next === errorObj) { return o.onError(next.e); } if (next.done) { return o.onCompleted(); } var result = next.value; - if (isFunction(mapper)) { - result = tryCatch(mapper)(result, i); + if (isFunction(fn)) { + result = tryCatch(fn)(result, i); if (result === errorObj) { return o.onError(result.e); } } o.onNext(result); recurse(i + 1); - } + }; + } + + FromObservable.prototype.subscribeCore = function (o) { + var list = Object(this._iterable), + it = getIterable(list); - return this.parent.scheduler.scheduleRecursiveWithState(0, loopRecursive); + return this._scheduler.scheduleRecursive(0, createScheduleMethod(o, it, this._fn)); }; - return FromSink; - }()); + return FromObservable; + }(ObservableBase)); var maxSafeInteger = Math.pow(2, 53) - 1; @@ -2318,38 +2371,30 @@ var FlatMapObservable = (function(__super__){ var FromArrayObservable = (function(__super__) { inherits(FromArrayObservable, __super__); function FromArrayObservable(args, scheduler) { - this.args = args; - this.scheduler = scheduler; + this._args = args; + this._scheduler = scheduler; __super__.call(this); } - FromArrayObservable.prototype.subscribeCore = function (observer) { - var sink = new FromArraySink(observer, this); - return sink.run(); + function scheduleMethod(o, args) { + var len = args.length; + return function loopRecursive (i, recurse) { + if (i < len) { + o.onNext(args[i]); + recurse(i + 1); + } else { + o.onCompleted(); + } + }; + } + + FromArrayObservable.prototype.subscribeCore = function (o) { + return this._scheduler.scheduleRecursive(0, scheduleMethod(o, this._args)); }; return FromArrayObservable; }(ObservableBase)); - function FromArraySink(observer, parent) { - this.observer = observer; - this.parent = parent; - } - - FromArraySink.prototype.run = function () { - var observer = this.observer, args = this.parent.args, len = args.length; - function loopRecursive(i, recurse) { - if (i < len) { - observer.onNext(args[i]); - recurse(i + 1); - } else { - observer.onCompleted(); - } - } - - return this.parent.scheduler.scheduleRecursiveWithState(0, loopRecursive); - }; - /** * Converts an array to an observable sequence, using an optional scheduler to enumerate the array. * @deprecated use Observable.from or Observable.of @@ -2412,41 +2457,32 @@ var FlatMapObservable = (function(__super__){ var PairsObservable = (function(__super__) { inherits(PairsObservable, __super__); - function PairsObservable(obj, scheduler) { - this.obj = obj; - this.keys = Object.keys(obj); - this.scheduler = scheduler; + function PairsObservable(o, scheduler) { + this._o = o; + this._keys = Object.keys(o); + this._scheduler = scheduler; __super__.call(this); } - PairsObservable.prototype.subscribeCore = function (observer) { - var sink = new PairsSink(observer, this); - return sink.run(); + function scheduleMethod(o, obj, keys) { + return function loopRecursive(i, recurse) { + if (i < keys.length) { + var key = keys[i]; + o.onNext([key, obj[key]]); + recurse(i + 1); + } else { + o.onCompleted(); + } + }; + } + + PairsObservable.prototype.subscribeCore = function (o) { + return this._scheduler.scheduleRecursive(0, scheduleMethod(o, this._o, this._keys)); }; return PairsObservable; }(ObservableBase)); - function PairsSink(observer, parent) { - this.observer = observer; - this.parent = parent; - } - - PairsSink.prototype.run = function () { - var observer = this.observer, obj = this.parent.obj, keys = this.parent.keys, len = keys.length; - function loopRecursive(i, recurse) { - if (i < len) { - var key = keys[i]; - observer.onNext([key, obj[key]]); - recurse(i + 1); - } else { - observer.onCompleted(); - } - } - - return this.parent.scheduler.scheduleRecursiveWithState(0, loopRecursive); - }; - /** * Convert an object into an observable sequence of [key, value] pairs. * @param {Object} obj The object to inspect. @@ -2467,36 +2503,26 @@ var FlatMapObservable = (function(__super__){ __super__.call(this); } - RangeObservable.prototype.subscribeCore = function (observer) { - var sink = new RangeSink(observer, this); - return sink.run(); - }; - - return RangeObservable; - }(ObservableBase)); - - var RangeSink = (function () { - function RangeSink(observer, parent) { - this.observer = observer; - this.parent = parent; - } - - RangeSink.prototype.run = function () { - var start = this.parent.start, count = this.parent.rangeCount, observer = this.observer; - function loopRecursive(i, recurse) { + function loopRecursive(start, count, o) { + return function loop (i, recurse) { if (i < count) { - observer.onNext(start + i); + o.onNext(start + i); recurse(i + 1); } else { - observer.onCompleted(); + o.onCompleted(); } - } + }; + } - return this.parent.scheduler.scheduleRecursiveWithState(0, loopRecursive); + RangeObservable.prototype.subscribeCore = function (o) { + return this.scheduler.scheduleRecursive( + 0, + loopRecursive(this.start, this.rangeCount, o) + ); }; - return RangeSink; - }()); + return RangeObservable; + }(ObservableBase)); /** * Generates an observable sequence of integral numbers within a specified range, using the specified scheduler to send out observer messages. @@ -2543,7 +2569,7 @@ var FlatMapObservable = (function(__super__){ recurse(i); } - return this.parent.scheduler.scheduleRecursiveWithState(this.parent.repeatCount, loopRecursive); + return this.parent.scheduler.scheduleRecursive(this.parent.repeatCount, loopRecursive); }; /** @@ -2561,22 +2587,18 @@ var FlatMapObservable = (function(__super__){ var JustObservable = (function(__super__) { inherits(JustObservable, __super__); function JustObservable(value, scheduler) { - this.value = value; - this.scheduler = scheduler; + this._value = value; + this._scheduler = scheduler; __super__.call(this); } - JustObservable.prototype.subscribeCore = function (observer) { - var sink = new JustSink(observer, this.value, this.scheduler); - return sink.run(); + JustObservable.prototype.subscribeCore = function (o) { + var state = [this._value, o]; + return this._scheduler === immediateScheduler ? + scheduleItem(null, state) : + this._scheduler.schedule(state, scheduleItem); }; - function JustSink(observer, value, scheduler) { - this.observer = observer; - this.value = value; - this.scheduler = scheduler; - } - function scheduleItem(s, state) { var value = state[0], observer = state[1]; observer.onNext(value); @@ -2584,13 +2606,6 @@ var FlatMapObservable = (function(__super__){ return disposableEmpty; } - JustSink.prototype.run = function () { - var state = [this.value, this.observer]; - return this.scheduler === immediateScheduler ? - scheduleItem(null, state) : - this.scheduler.scheduleWithState(state, scheduleItem); - }; - return JustObservable; }(ObservableBase)); @@ -2609,30 +2624,24 @@ var FlatMapObservable = (function(__super__){ var ThrowObservable = (function(__super__) { inherits(ThrowObservable, __super__); function ThrowObservable(error, scheduler) { - this.error = error; - this.scheduler = scheduler; + this._error = error; + this._scheduler = scheduler; __super__.call(this); } ThrowObservable.prototype.subscribeCore = function (o) { - var sink = new ThrowSink(o, this); - return sink.run(); + var state = [this._error, o]; + return this._scheduler === immediateScheduler ? + scheduleItem(null, state) : + this._scheduler.schedule(state, scheduleItem); }; - function ThrowSink(o, p) { - this.o = o; - this.p = p; - } - function scheduleItem(s, state) { var e = state[0], o = state[1]; o.onError(e); + return disposableEmpty; } - ThrowSink.prototype.run = function () { - return this.p.scheduler.scheduleWithState([this.p.error, this.o], scheduleItem); - }; - return ThrowObservable; }(ObservableBase)); @@ -2648,6 +2657,24 @@ var FlatMapObservable = (function(__super__){ return new ThrowObservable(error, scheduler); }; + var CatchObservable = (function (__super__) { + inherits(CatchObservable, __super__); + function CatchObservable(source, fn) { + this.source = source; + this._fn = fn; + __super__.call(this); + } + + CatchObservable.prototype.subscribeCore = function (o) { + var d1 = new SingleAssignmentDisposable(), subscription = new SerialDisposable(); + subscription.setDisposable(d1); + d1.setDisposable(this.source.subscribe(new CatchObserver(o, subscription, this._fn))); + return subscription; + }; + + return CatchObservable; + }(ObservableBase)); + var CatchObserver = (function(__super__) { inherits(CatchObserver, __super__); function CatchObserver(o, s, fn) { @@ -2672,22 +2699,13 @@ var FlatMapObservable = (function(__super__){ return CatchObserver; }(AbstractObserver)); - function observableCatchHandler(source, handler) { - return new AnonymousObservable(function (o) { - var d1 = new SingleAssignmentDisposable(), subscription = new SerialDisposable(); - subscription.setDisposable(d1); - d1.setDisposable(source.subscribe(new CatchObserver(o, subscription, handler))); - return subscription; - }, source); - } - /** * Continues an observable sequence that is terminated by an exception with the next observable sequence. * @param {Mixed} handlerOrSecond Exception handler function that returns an observable sequence given the error that occurred in the first sequence, or a second observable sequence used to produce results when an error occurred in the first sequence. * @returns {Observable} An observable sequence containing the first sequence's elements, followed by the elements of the handler sequence in case an exception occurred. */ observableProto['catch'] = function (handlerOrSecond) { - return isFunction(handlerOrSecond) ? observableCatchHandler(this, handlerOrSecond) : observableCatch([this, handlerOrSecond]); + return isFunction(handlerOrSecond) ? new CatchObservable(this, handlerOrSecond) : observableCatch([this, handlerOrSecond]); }; /** @@ -2734,64 +2752,92 @@ var FlatMapObservable = (function(__super__){ return args; } - /** - * Merges the specified observable sequences into one observable sequence by using the selector function whenever any of the observable sequences or Promises produces an element. - * - * @example - * 1 - obs = Rx.Observable.combineLatest(obs1, obs2, obs3, function (o1, o2, o3) { return o1 + o2 + o3; }); - * 2 - obs = Rx.Observable.combineLatest([obs1, obs2, obs3], function (o1, o2, o3) { return o1 + o2 + o3; }); - * @returns {Observable} An observable sequence containing the result of combining elements of the sources using the specified result selector function. - */ - var combineLatest = Observable.combineLatest = function () { - var len = arguments.length, args = new Array(len); - for(var i = 0; i < len; i++) { args[i] = arguments[i]; } - var resultSelector = isFunction(args[len - 1]) ? args.pop() : argumentsToArray; - Array.isArray(args[0]) && (args = args[0]); + var CombineLatestObservable = (function(__super__) { + inherits(CombineLatestObservable, __super__); + function CombineLatestObservable(params, cb) { + this._params = params; + this._cb = cb; + __super__.call(this); + } - return new AnonymousObservable(function (o) { - var n = args.length, - hasValue = arrayInitialize(n, falseFactory), - hasValueAll = false, - isDone = arrayInitialize(n, falseFactory), - values = new Array(n); + CombineLatestObservable.prototype.subscribeCore = function(observer) { + var len = this._params.length, + subscriptions = new Array(len); - function next(i) { - hasValue[i] = true; - if (hasValueAll || (hasValueAll = hasValue.every(identity))) { - try { - var res = resultSelector.apply(null, values); - } catch (e) { - return o.onError(e); - } - o.onNext(res); - } else if (isDone.filter(function (x, j) { return j !== i; }).every(identity)) { - o.onCompleted(); - } - } + var state = { + hasValue: arrayInitialize(len, falseFactory), + hasValueAll: false, + isDone: arrayInitialize(len, falseFactory), + values: new Array(len) + }; - function done (i) { - isDone[i] = true; - isDone.every(identity) && o.onCompleted(); + for (var i = 0; i < len; i++) { + var source = this._params[i], sad = new SingleAssignmentDisposable(); + subscriptions[i] = sad; + isPromise(source) && (source = observableFromPromise(source)); + sad.setDisposable(source.subscribe(new CombineLatestObserver(observer, i, this._cb, state))); } - var subscriptions = new Array(n); - for (var idx = 0; idx < n; idx++) { - (function (i) { - var source = args[i], sad = new SingleAssignmentDisposable(); - isPromise(source) && (source = observableFromPromise(source)); - sad.setDisposable(source.subscribe(function (x) { - values[i] = x; - next(i); - }, - function(e) { o.onError(e); }, - function () { done(i); } - )); - subscriptions[i] = sad; - }(idx)); + return new NAryDisposable(subscriptions); + }; + + return CombineLatestObservable; + }(ObservableBase)); + + var CombineLatestObserver = (function (__super__) { + inherits(CombineLatestObserver, __super__); + function CombineLatestObserver(o, i, cb, state) { + this._o = o; + this._i = i; + this._cb = cb; + this._state = state; + __super__.call(this); + } + + function notTheSame(i) { + return function (x, j) { + return j !== i; + }; + } + + CombineLatestObserver.prototype.next = function (x) { + this._state.values[this._i] = x; + this._state.hasValue[this._i] = true; + if (this._state.hasValueAll || (this._state.hasValueAll = this._state.hasValue.every(identity))) { + var res = tryCatch(this._cb).apply(null, this._state.values); + if (res === errorObj) { return this._o.onError(res.e); } + this._o.onNext(res); + } else if (this._state.isDone.filter(notTheSame(this._i)).every(identity)) { + this._o.onCompleted(); } + }; - return new CompositeDisposable(subscriptions); - }, this); + CombineLatestObserver.prototype.error = function (e) { + this._o.onError(e); + }; + + CombineLatestObserver.prototype.completed = function () { + this._state.isDone[this._i] = true; + this._state.isDone.every(identity) && this._o.onCompleted(); + }; + + return CombineLatestObserver; + }(AbstractObserver)); + + /** + * Merges the specified observable sequences into one observable sequence by using the selector function whenever any of the observable sequences or Promises produces an element. + * + * @example + * 1 - obs = Rx.Observable.combineLatest(obs1, obs2, obs3, function (o1, o2, o3) { return o1 + o2 + o3; }); + * 2 - obs = Rx.Observable.combineLatest([obs1, obs2, obs3], function (o1, o2, o3) { return o1 + o2 + o3; }); + * @returns {Observable} An observable sequence containing the result of combining elements of the sources using the specified result selector function. + */ + var combineLatest = Observable.combineLatest = function () { + var len = arguments.length, args = new Array(len); + for(var i = 0; i < len; i++) { args[i] = arguments[i]; } + var resultSelector = isFunction(args[len - 1]) ? args.pop() : argumentsToArray; + Array.isArray(args[0]) && (args = args[0]); + return new CombineLatestObservable(args, resultSelector); }; /** @@ -2804,49 +2850,56 @@ var FlatMapObservable = (function(__super__){ return observableConcat.apply(null, args); }; + var ConcatObserver = (function(__super__) { + inherits(ConcatObserver, __super__); + function ConcatObserver(s, fn) { + this._s = s; + this._fn = fn; + __super__.call(this); + } + + ConcatObserver.prototype.next = function (x) { this._s.o.onNext(x); }; + ConcatObserver.prototype.error = function (e) { this._s.o.onError(e); }; + ConcatObserver.prototype.completed = function () { this._s.i++; this._fn(this._s); }; + + return ConcatObserver; + }(AbstractObserver)); + var ConcatObservable = (function(__super__) { inherits(ConcatObservable, __super__); function ConcatObservable(sources) { - this.sources = sources; + this._sources = sources; __super__.call(this); } - ConcatObservable.prototype.subscribeCore = function(o) { - var sink = new ConcatSink(this.sources, o); - return sink.run(); - }; + function scheduleRecursive (state, recurse) { + if (state.disposable.isDisposed) { return; } + if (state.i === state.sources.length) { return state.o.onCompleted(); } - function ConcatSink(sources, o) { - this.sources = sources; - this.o = o; - } - ConcatSink.prototype.run = function () { - var isDisposed, subscription = new SerialDisposable(), sources = this.sources, length = sources.length, o = this.o; - var cancelable = immediateScheduler.scheduleRecursiveWithState(0, function (i, self) { - if (isDisposed) { return; } - if (i === length) { - return o.onCompleted(); - } + // Check if promise + var currentValue = state.sources[state.i]; + isPromise(currentValue) && (currentValue = observableFromPromise(currentValue)); - // Check if promise - var currentValue = sources[i]; - isPromise(currentValue) && (currentValue = observableFromPromise(currentValue)); + var d = new SingleAssignmentDisposable(); + state.subscription.setDisposable(d); + d.setDisposable(currentValue.subscribe(new ConcatObserver(state, recurse))); + } - var d = new SingleAssignmentDisposable(); - subscription.setDisposable(d); - d.setDisposable(currentValue.subscribe( - function (x) { o.onNext(x); }, - function (e) { o.onError(e); }, - function () { self(i + 1); } - )); - }); + ConcatObservable.prototype.subscribeCore = function(o) { + var subscription = new SerialDisposable(); + var disposable = disposableCreate(noop); + var state = { + o: o, + i: 0, + subscription: subscription, + disposable: disposable, + sources: this._sources + }; - return new CompositeDisposable(subscription, cancelable, disposableCreate(function () { - isDisposed = true; - })); + var cancelable = immediateScheduler.scheduleRecursive(state, scheduleRecursive); + return new NAryDisposable([subscription, disposable, cancelable]); }; - return ConcatObservable; }(ObservableBase)); @@ -2893,7 +2946,7 @@ var FlatMapObservable = (function(__super__){ }(ObservableBase)); - var MergeObserver = (function () { + var MergeObserver = (function (__super__) { function MergeObserver(o, max, g) { this.o = o; this.max = max; @@ -2901,97 +2954,55 @@ var FlatMapObservable = (function(__super__){ this.done = false; this.q = []; this.activeCount = 0; - this.isStopped = false; + __super__.call(this); } + + inherits(MergeObserver, __super__); + MergeObserver.prototype.handleSubscribe = function (xs) { var sad = new SingleAssignmentDisposable(); this.g.add(sad); isPromise(xs) && (xs = observableFromPromise(xs)); sad.setDisposable(xs.subscribe(new InnerObserver(this, sad))); }; - MergeObserver.prototype.onNext = function (innerSource) { - if (this.isStopped) { return; } - if(this.activeCount < this.max) { - this.activeCount++; - this.handleSubscribe(innerSource); - } else { - this.q.push(innerSource); - } - }; - MergeObserver.prototype.onError = function (e) { - if (!this.isStopped) { - this.isStopped = true; - this.o.onError(e); - } - }; - MergeObserver.prototype.onCompleted = function () { - if (!this.isStopped) { - this.isStopped = true; - this.done = true; - this.activeCount === 0 && this.o.onCompleted(); - } - }; - MergeObserver.prototype.dispose = function() { this.isStopped = true; }; - MergeObserver.prototype.fail = function (e) { - if (!this.isStopped) { - this.isStopped = true; - this.o.onError(e); - return true; - } - - return false; - }; - function InnerObserver(parent, sad) { - this.parent = parent; - this.sad = sad; - this.isStopped = false; + MergeObserver.prototype.next = function (innerSource) { + if(this.activeCount < this.max) { + this.activeCount++; + this.handleSubscribe(innerSource); + } else { + this.q.push(innerSource); } - InnerObserver.prototype.onNext = function (x) { if(!this.isStopped) { this.parent.o.onNext(x); } }; - InnerObserver.prototype.onError = function (e) { - if (!this.isStopped) { - this.isStopped = true; - this.parent.o.onError(e); - } - }; - InnerObserver.prototype.onCompleted = function () { - if(!this.isStopped) { - this.isStopped = true; - var parent = this.parent; - parent.g.remove(this.sad); - if (parent.q.length > 0) { - parent.handleSubscribe(parent.q.shift()); - } else { - parent.activeCount--; - parent.done && parent.activeCount === 0 && parent.o.onCompleted(); - } - } - }; - InnerObserver.prototype.dispose = function() { this.isStopped = true; }; - InnerObserver.prototype.fail = function (e) { - if (!this.isStopped) { - this.isStopped = true; - this.parent.o.onError(e); - return true; - } - - return false; - }; - - return MergeObserver; - }()); + }; + MergeObserver.prototype.error = function (e) { this.o.onError(e); }; + MergeObserver.prototype.completed = function () { this.done = true; this.activeCount === 0 && this.o.onCompleted(); }; + function InnerObserver(parent, sad) { + this.parent = parent; + this.sad = sad; + __super__.call(this); + } + inherits(InnerObserver, __super__); + InnerObserver.prototype.next = function (x) { this.parent.o.onNext(x); }; + InnerObserver.prototype.error = function (e) { this.parent.o.onError(e); }; + InnerObserver.prototype.completed = function () { + this.parent.g.remove(this.sad); + if (this.parent.q.length > 0) { + this.parent.handleSubscribe(this.parent.q.shift()); + } else { + this.parent.activeCount--; + this.parent.done && this.parent.activeCount === 0 && this.parent.o.onCompleted(); + } + }; + return MergeObserver; + }(AbstractObserver)); /** * Merges an observable sequence of observable sequences into an observable sequence, limiting the number of concurrent subscriptions to inner sequences. * Or merges two observable sequences into a single observable sequence. - * - * @example - * 1 - merged = sources.merge(1); - * 2 - merged = source.merge(otherSource); * @param {Mixed} [maxConcurrentOrOther] Maximum number of inner observable sequences being subscribed to concurrently or the second observable sequence. * @returns {Observable} The observable sequence that merges the elements of the inner sequences. */ @@ -3025,12 +3036,93 @@ var FlatMapObservable = (function(__super__){ }; var CompositeError = Rx.CompositeError = function(errors) { - this.name = "NotImplementedError"; this.innerErrors = errors; this.message = 'This contains multiple errors. Check the innerErrors'; Error.call(this); - } - CompositeError.prototype = Error.prototype; + }; + CompositeError.prototype = Object.create(Error.prototype); + CompositeError.prototype.name = 'CompositeError'; + + var MergeDelayErrorObservable = (function(__super__) { + inherits(MergeDelayErrorObservable, __super__); + function MergeDelayErrorObservable(source) { + this.source = source; + __super__.call(this); + } + + MergeDelayErrorObservable.prototype.subscribeCore = function (o) { + var group = new CompositeDisposable(), + m = new SingleAssignmentDisposable(), + state = { isStopped: false, errors: [], o: o }; + + group.add(m); + m.setDisposable(this.source.subscribe(new MergeDelayErrorObserver(group, state))); + + return group; + }; + + return MergeDelayErrorObservable; + }(ObservableBase)); + + var MergeDelayErrorObserver = (function(__super__) { + inherits(MergeDelayErrorObserver, __super__); + function MergeDelayErrorObserver(group, state) { + this._group = group; + this._state = state; + __super__.call(this); + } + + function setCompletion(o, errors) { + if (errors.length === 0) { + o.onCompleted(); + } else if (errors.length === 1) { + o.onError(errors[0]); + } else { + o.onError(new CompositeError(errors)); + } + } + + MergeDelayErrorObserver.prototype.next = function (x) { + var inner = new SingleAssignmentDisposable(); + this._group.add(inner); + + // Check for promises support + isPromise(x) && (x = observableFromPromise(x)); + inner.setDisposable(x.subscribe(new InnerObserver(inner, this._group, this._state))); + }; + + MergeDelayErrorObserver.prototype.error = function (e) { + this._state.errors.push(e); + this._state.isStopped = true; + this._group.length === 1 && setCompletion(this._state.o, this._state.errors); + }; + + MergeDelayErrorObserver.prototype.completed = function () { + this._state.isStopped = true; + this._group.length === 1 && setCompletion(this._state.o, this._state.errors); + }; + + inherits(InnerObserver, __super__); + function InnerObserver(inner, group, state) { + this._inner = inner; + this._group = group; + this._state = state; + __super__.call(this); + } + + InnerObserver.prototype.next = function (x) { this._state.o.onNext(x); }; + InnerObserver.prototype.error = function (e) { + this._state.errors.push(e); + this._group.remove(this._inner); + this._state.isStopped && this._group.length === 1 && setCompletion(this._state.o, this._state.errors); + }; + InnerObserver.prototype.completed = function () { + this._group.remove(this._inner); + this._state.isStopped && this._group.length === 1 && setCompletion(this._state.o, this._state.errors); + }; + + return MergeDelayErrorObserver; + }(AbstractObserver)); /** * Flattens an Observable that emits Observables into one Observable, in a way that allows an Observer to @@ -3053,56 +3145,7 @@ var FlatMapObservable = (function(__super__){ for(var i = 0; i < len; i++) { args[i] = arguments[i]; } } var source = observableOf(null, args); - - return new AnonymousObservable(function (o) { - var group = new CompositeDisposable(), - m = new SingleAssignmentDisposable(), - isStopped = false, - errors = []; - - function setCompletion() { - if (errors.length === 0) { - o.onCompleted(); - } else if (errors.length === 1) { - o.onError(errors[0]); - } else { - o.onError(new CompositeError(errors)); - } - } - - group.add(m); - - m.setDisposable(source.subscribe( - function (innerSource) { - var innerSubscription = new SingleAssignmentDisposable(); - group.add(innerSubscription); - - // Check for promises support - isPromise(innerSource) && (innerSource = observableFromPromise(innerSource)); - - innerSubscription.setDisposable(innerSource.subscribe( - function (x) { o.onNext(x); }, - function (e) { - errors.push(e); - group.remove(innerSubscription); - isStopped && group.length === 1 && setCompletion(); - }, - function () { - group.remove(innerSubscription); - isStopped && group.length === 1 && setCompletion(); - })); - }, - function (e) { - errors.push(e); - isStopped = true; - group.length === 1 && setCompletion(); - }, - function () { - isStopped = true; - group.length === 1 && setCompletion(); - })); - return group; - }); + return new MergeDelayErrorObservable(source); }; var MergeAllObservable = (function (__super__) { @@ -3113,85 +3156,63 @@ var FlatMapObservable = (function(__super__){ __super__.call(this); } - MergeAllObservable.prototype.subscribeCore = function (observer) { + MergeAllObservable.prototype.subscribeCore = function (o) { var g = new CompositeDisposable(), m = new SingleAssignmentDisposable(); g.add(m); - m.setDisposable(this.source.subscribe(new MergeAllObserver(observer, g))); + m.setDisposable(this.source.subscribe(new MergeAllObserver(o, g))); return g; }; + return MergeAllObservable; + }(ObservableBase)); + + var MergeAllObserver = (function (__super__) { function MergeAllObserver(o, g) { this.o = o; this.g = g; - this.isStopped = false; this.done = false; + __super__.call(this); } - MergeAllObserver.prototype.onNext = function(innerSource) { - if(this.isStopped) { return; } + + inherits(MergeAllObserver, __super__); + + MergeAllObserver.prototype.next = function(innerSource) { var sad = new SingleAssignmentDisposable(); this.g.add(sad); - isPromise(innerSource) && (innerSource = observableFromPromise(innerSource)); - sad.setDisposable(innerSource.subscribe(new InnerObserver(this, sad))); }; - MergeAllObserver.prototype.onError = function (e) { - if(!this.isStopped) { - this.isStopped = true; - this.o.onError(e); - } - }; - MergeAllObserver.prototype.onCompleted = function () { - if(!this.isStopped) { - this.isStopped = true; - this.done = true; - this.g.length === 1 && this.o.onCompleted(); - } + + MergeAllObserver.prototype.error = function (e) { + this.o.onError(e); }; - MergeAllObserver.prototype.dispose = function() { this.isStopped = true; }; - MergeAllObserver.prototype.fail = function (e) { - if (!this.isStopped) { - this.isStopped = true; - this.o.onError(e); - return true; - } - return false; + MergeAllObserver.prototype.completed = function () { + this.done = true; + this.g.length === 1 && this.o.onCompleted(); }; function InnerObserver(parent, sad) { this.parent = parent; this.sad = sad; - this.isStopped = false; + __super__.call(this); } - InnerObserver.prototype.onNext = function (x) { if (!this.isStopped) { this.parent.o.onNext(x); } }; - InnerObserver.prototype.onError = function (e) { - if(!this.isStopped) { - this.isStopped = true; - this.parent.o.onError(e); - } + + inherits(InnerObserver, __super__); + + InnerObserver.prototype.next = function (x) { + this.parent.o.onNext(x); }; - InnerObserver.prototype.onCompleted = function () { - if(!this.isStopped) { - var parent = this.parent; - this.isStopped = true; - parent.g.remove(this.sad); - parent.done && parent.g.length === 1 && parent.o.onCompleted(); - } + InnerObserver.prototype.error = function (e) { + this.parent.o.onError(e); }; - InnerObserver.prototype.dispose = function() { this.isStopped = true; }; - InnerObserver.prototype.fail = function (e) { - if (!this.isStopped) { - this.isStopped = true; - this.parent.o.onError(e); - return true; - } - - return false; + InnerObserver.prototype.completed = function () { + this.parent.g.remove(this.sad); + this.parent.done && this.parent.g.length === 1 && this.parent.o.onCompleted(); }; - return MergeAllObservable; - }(ObservableBase)); + return MergeAllObserver; + }(AbstractObserver)); /** * Merges an observable sequence of observable sequences into an observable sequence. @@ -3201,34 +3222,86 @@ var FlatMapObservable = (function(__super__){ return new MergeAllObservable(this); }; + var SkipUntilObservable = (function(__super__) { + inherits(SkipUntilObservable, __super__); + + function SkipUntilObservable(source, other) { + this._s = source; + this._o = isPromise(other) ? observableFromPromise(other) : other; + this._open = false; + __super__.call(this); + } + + SkipUntilObservable.prototype.subscribeCore = function(o) { + var leftSubscription = new SingleAssignmentDisposable(); + leftSubscription.setDisposable(this._s.subscribe(new SkipUntilSourceObserver(o, this))); + + isPromise(this._o) && (this._o = observableFromPromise(this._o)); + + var rightSubscription = new SingleAssignmentDisposable(); + rightSubscription.setDisposable(this._o.subscribe(new SkipUntilOtherObserver(o, this, rightSubscription))); + + return new BinaryDisposable(leftSubscription, rightSubscription); + }; + + return SkipUntilObservable; + }(ObservableBase)); + + var SkipUntilSourceObserver = (function(__super__) { + inherits(SkipUntilSourceObserver, __super__); + function SkipUntilSourceObserver(o, p) { + this._o = o; + this._p = p; + __super__.call(this); + } + + SkipUntilSourceObserver.prototype.next = function (x) { + this._p._open && this._o.onNext(x); + }; + + SkipUntilSourceObserver.prototype.error = function (err) { + this._o.onError(err); + }; + + SkipUntilSourceObserver.prototype.onCompleted = function () { + this._p._open && this._o.onCompleted(); + }; + + return SkipUntilSourceObserver; + }(AbstractObserver)); + + var SkipUntilOtherObserver = (function(__super__) { + inherits(SkipUntilOtherObserver, __super__); + function SkipUntilOtherObserver(o, p, r) { + this._o = o; + this._p = p; + this._r = r; + __super__.call(this); + } + + SkipUntilOtherObserver.prototype.next = function () { + this._p._open = true; + this._r.dispose(); + }; + + SkipUntilOtherObserver.prototype.error = function (err) { + this._o.onError(err); + }; + + SkipUntilOtherObserver.prototype.onCompleted = function () { + this._r.dispose(); + }; + + return SkipUntilOtherObserver; + }(AbstractObserver)); + /** * Returns the values from the source observable sequence only after the other observable sequence produces a value. * @param {Observable | Promise} other The observable sequence or Promise that triggers propagation of elements of the source sequence. * @returns {Observable} An observable sequence containing the elements of the source sequence starting from the point the other sequence triggered propagation. */ observableProto.skipUntil = function (other) { - var source = this; - return new AnonymousObservable(function (o) { - var isOpen = false; - var disposables = new CompositeDisposable(source.subscribe(function (left) { - isOpen && o.onNext(left); - }, function (e) { o.onError(e); }, function () { - isOpen && o.onCompleted(); - })); - - isPromise(other) && (other = observableFromPromise(other)); - - var rightSubscription = new SingleAssignmentDisposable(); - disposables.add(rightSubscription); - rightSubscription.setDisposable(other.subscribe(function () { - isOpen = true; - rightSubscription.dispose(); - }, function (e) { o.onError(e); }, function () { - rightSubscription.dispose(); - })); - - return disposables; - }, source); + return new SkipUntilObservable(this, other); }; var SwitchObservable = (function(__super__) { @@ -3240,80 +3313,55 @@ var FlatMapObservable = (function(__super__){ SwitchObservable.prototype.subscribeCore = function (o) { var inner = new SerialDisposable(), s = this.source.subscribe(new SwitchObserver(o, inner)); - return new CompositeDisposable(s, inner); + return new BinaryDisposable(s, inner); }; + inherits(SwitchObserver, AbstractObserver); function SwitchObserver(o, inner) { this.o = o; this.inner = inner; this.stopped = false; this.latest = 0; this.hasLatest = false; - this.isStopped = false; + AbstractObserver.call(this); } - SwitchObserver.prototype.onNext = function (innerSource) { - if (this.isStopped) { return; } + + SwitchObserver.prototype.next = function (innerSource) { var d = new SingleAssignmentDisposable(), id = ++this.latest; this.hasLatest = true; this.inner.setDisposable(d); isPromise(innerSource) && (innerSource = observableFromPromise(innerSource)); d.setDisposable(innerSource.subscribe(new InnerObserver(this, id))); }; - SwitchObserver.prototype.onError = function (e) { - if (!this.isStopped) { - this.isStopped = true; - this.o.onError(e); - } - }; - SwitchObserver.prototype.onCompleted = function () { - if (!this.isStopped) { - this.isStopped = true; - this.stopped = true; - !this.hasLatest && this.o.onCompleted(); - } + + SwitchObserver.prototype.error = function (e) { + this.o.onError(e); }; - SwitchObserver.prototype.dispose = function () { this.isStopped = true; }; - SwitchObserver.prototype.fail = function (e) { - if(!this.isStopped) { - this.isStopped = true; - this.o.onError(e); - return true; - } - return false; + + SwitchObserver.prototype.completed = function () { + this.stopped = true; + !this.hasLatest && this.o.onCompleted(); }; + inherits(InnerObserver, AbstractObserver); function InnerObserver(parent, id) { this.parent = parent; this.id = id; - this.isStopped = false; + AbstractObserver.call(this); } - InnerObserver.prototype.onNext = function (x) { - if (this.isStopped) { return; } + InnerObserver.prototype.next = function (x) { this.parent.latest === this.id && this.parent.o.onNext(x); }; - InnerObserver.prototype.onError = function (e) { - if (!this.isStopped) { - this.isStopped = true; - this.parent.latest === this.id && this.parent.o.onError(e); - } - }; - InnerObserver.prototype.onCompleted = function () { - if (!this.isStopped) { - this.isStopped = true; - if (this.parent.latest === this.id) { - this.parent.hasLatest = false; - this.parent.isStopped && this.parent.o.onCompleted(); - } - } + + InnerObserver.prototype.error = function (e) { + this.parent.latest === this.id && this.parent.o.onError(e); }; - InnerObserver.prototype.dispose = function () { this.isStopped = true; } - InnerObserver.prototype.fail = function (e) { - if(!this.isStopped) { - this.isStopped = true; - this.parent.o.onError(e); - return true; + + InnerObserver.prototype.completed = function () { + if (this.parent.latest === this.id) { + this.parent.hasLatest = false; + this.parent.stopped && this.parent.o.onCompleted(); } - return false; }; return SwitchObservable; @@ -3337,41 +3385,34 @@ var FlatMapObservable = (function(__super__){ } TakeUntilObservable.prototype.subscribeCore = function(o) { - return new CompositeDisposable( + return new BinaryDisposable( this.source.subscribe(o), - this.other.subscribe(new InnerObserver(o)) + this.other.subscribe(new TakeUntilObserver(o)) ); }; - function InnerObserver(o) { - this.o = o; - this.isStopped = false; + return TakeUntilObservable; + }(ObservableBase)); + + var TakeUntilObserver = (function(__super__) { + inherits(TakeUntilObserver, __super__); + function TakeUntilObserver(o) { + this._o = o; + __super__.call(this); } - InnerObserver.prototype.onNext = function (x) { - if (this.isStopped) { return; } - this.o.onCompleted(); - }; - InnerObserver.prototype.onError = function (err) { - if (!this.isStopped) { - this.isStopped = true; - this.o.onError(err); - } - }; - InnerObserver.prototype.onCompleted = function () { - !this.isStopped && (this.isStopped = true); + + TakeUntilObserver.prototype.next = function () { + this._o.onCompleted(); }; - InnerObserver.prototype.dispose = function() { this.isStopped = true; }; - InnerObserver.prototype.fail = function (e) { - if (!this.isStopped) { - this.isStopped = true; - this.o.onError(e); - return true; - } - return false; + + TakeUntilObserver.prototype.error = function (err) { + this._o.onError(err); }; - return TakeUntilObservable; - }(ObservableBase)); + TakeUntilObserver.prototype.onCompleted = noop; + + return TakeUntilObserver; + }(AbstractObserver)); /** * Returns the values from the source observable sequence until the other observable sequence produces a value. @@ -3383,60 +3424,186 @@ var FlatMapObservable = (function(__super__){ }; function falseFactory() { return false; } + function argumentsToArray() { + var len = arguments.length, args = new Array(len); + for(var i = 0; i < len; i++) { args[i] = arguments[i]; } + return args; + } + + var WithLatestFromObservable = (function(__super__) { + inherits(WithLatestFromObservable, __super__); + function WithLatestFromObservable(source, sources, resultSelector) { + this._s = source; + this._ss = sources; + this._cb = resultSelector; + __super__.call(this); + } + + WithLatestFromObservable.prototype.subscribeCore = function (o) { + var len = this._ss.length; + var state = { + hasValue: arrayInitialize(len, falseFactory), + hasValueAll: false, + values: new Array(len) + }; + + var n = this._ss.length, subscriptions = new Array(n + 1); + for (var i = 0; i < n; i++) { + var other = this._ss[i], sad = new SingleAssignmentDisposable(); + isPromise(other) && (other = observableFromPromise(other)); + sad.setDisposable(other.subscribe(new WithLatestFromOtherObserver(o, i, state))); + subscriptions[i] = sad; + } + + var outerSad = new SingleAssignmentDisposable(); + outerSad.setDisposable(this._s.subscribe(new WithLatestFromSourceObserver(o, this._cb, state))); + subscriptions[n] = outerSad; + + return new NAryDisposable(subscriptions); + }; + + return WithLatestFromObservable; + }(ObservableBase)); + + var WithLatestFromOtherObserver = (function (__super__) { + inherits(WithLatestFromOtherObserver, __super__); + function WithLatestFromOtherObserver(o, i, state) { + this._o = o; + this._i = i; + this._state = state; + __super__.call(this); + } + + WithLatestFromOtherObserver.prototype.next = function (x) { + this._state.values[this._i] = x; + this._state.hasValue[this._i] = true; + this._state.hasValueAll = this._state.hasValue.every(identity); + }; + + WithLatestFromOtherObserver.prototype.error = function (e) { + this._o.onError(e); + }; + + WithLatestFromOtherObserver.prototype.completed = noop; + + return WithLatestFromOtherObserver; + }(AbstractObserver)); + + var WithLatestFromSourceObserver = (function (__super__) { + inherits(WithLatestFromSourceObserver, __super__); + function WithLatestFromSourceObserver(o, cb, state) { + this._o = o; + this._cb = cb; + this._state = state; + __super__.call(this); + } + + WithLatestFromSourceObserver.prototype.next = function (x) { + var allValues = [x].concat(this._state.values); + if (!this._state.hasValueAll) { return; } + var res = tryCatch(this._cb).apply(null, allValues); + if (res === errorObj) { return this._o.onError(res.e); } + this._o.onNext(res); + }; + + WithLatestFromSourceObserver.prototype.error = function (e) { + this._o.onError(e); + }; + + WithLatestFromSourceObserver.prototype.completed = function () { + this._o.onCompleted(); + }; + + return WithLatestFromSourceObserver; + }(AbstractObserver)); /** * Merges the specified observable sequences into one observable sequence by using the selector function only when the (first) source observable sequence produces an element. * @returns {Observable} An observable sequence containing the result of combining elements of the sources using the specified result selector function. */ observableProto.withLatestFrom = function () { - var len = arguments.length, args = new Array(len) + if (arguments.length === 0) { throw new Error('invalid arguments'); } + + var len = arguments.length, args = new Array(len); for(var i = 0; i < len; i++) { args[i] = arguments[i]; } - var resultSelector = args.pop(), source = this; + var resultSelector = isFunction(args[len - 1]) ? args.pop() : argumentsToArray; Array.isArray(args[0]) && (args = args[0]); - return new AnonymousObservable(function (observer) { - var n = args.length, - hasValue = arrayInitialize(n, falseFactory), - hasValueAll = false, - values = new Array(n); - - var subscriptions = new Array(n + 1); - for (var idx = 0; idx < n; idx++) { - (function (i) { - var other = args[i], sad = new SingleAssignmentDisposable(); - isPromise(other) && (other = observableFromPromise(other)); - sad.setDisposable(other.subscribe(function (x) { - values[i] = x; - hasValue[i] = true; - hasValueAll = hasValue.every(identity); - }, function (e) { observer.onError(e); }, noop)); - subscriptions[i] = sad; - }(idx)); - } - - var sad = new SingleAssignmentDisposable(); - sad.setDisposable(source.subscribe(function (x) { - var allValues = [x].concat(values); - if (!hasValueAll) { return; } - var res = tryCatch(resultSelector).apply(null, allValues); - if (res === errorObj) { return observer.onError(res.e); } - observer.onNext(res); - }, function (e) { observer.onError(e); }, function () { - observer.onCompleted(); - })); - subscriptions[n] = sad; - - return new CompositeDisposable(subscriptions); - }, this); + return new WithLatestFromObservable(this, args, resultSelector); }; function falseFactory() { return false; } function emptyArrayFactory() { return []; } - function argumentsToArray() { - var len = arguments.length, args = new Array(len); - for(var i = 0; i < len; i++) { args[i] = arguments[i]; } - return args; - } + + var ZipObservable = (function(__super__) { + inherits(ZipObservable, __super__); + function ZipObservable(sources, resultSelector) { + this._s = sources; + this._cb = resultSelector; + __super__.call(this); + } + + ZipObservable.prototype.subscribeCore = function(observer) { + var n = this._s.length, + subscriptions = new Array(n), + done = arrayInitialize(n, falseFactory), + q = arrayInitialize(n, emptyArrayFactory); + + for (var i = 0; i < n; i++) { + var source = this._s[i], sad = new SingleAssignmentDisposable(); + subscriptions[i] = sad; + isPromise(source) && (source = observableFromPromise(source)); + sad.setDisposable(source.subscribe(new ZipObserver(observer, i, this, q, done))); + } + + return new NAryDisposable(subscriptions); + }; + + return ZipObservable; + }(ObservableBase)); + + var ZipObserver = (function (__super__) { + inherits(ZipObserver, __super__); + function ZipObserver(o, i, p, q, d) { + this._o = o; + this._i = i; + this._p = p; + this._q = q; + this._d = d; + __super__.call(this); + } + + function notEmpty(x) { return x.length > 0; } + function shiftEach(x) { return x.shift(); } + function notTheSame(i) { + return function (x, j) { + return j !== i; + }; + } + + ZipObserver.prototype.next = function (x) { + this._q[this._i].push(x); + if (this._q.every(notEmpty)) { + var queuedValues = this._q.map(shiftEach); + var res = tryCatch(this._p._cb).apply(null, queuedValues); + if (res === errorObj) { return this._o.onError(res.e); } + this._o.onNext(res); + } else if (this._d.filter(notTheSame(this._i)).every(identity)) { + this._o.onCompleted(); + } + }; + + ZipObserver.prototype.error = function (e) { + this._o.onError(e); + }; + + ZipObserver.prototype.completed = function () { + this._d[this._i] = true; + this._d.every(identity) && this._o.onCompleted(); + }; + + return ZipObserver; + }(AbstractObserver)); /** * Merges the specified observable sequences into one observable sequence by using the selector function whenever all of the observable sequences or an array have produced an element at a corresponding index. @@ -3453,38 +3620,8 @@ var FlatMapObservable = (function(__super__){ var parent = this; args.unshift(parent); - return new AnonymousObservable(function (o) { - var n = args.length, - queues = arrayInitialize(n, emptyArrayFactory), - isDone = arrayInitialize(n, falseFactory); - - var subscriptions = new Array(n); - for (var idx = 0; idx < n; idx++) { - (function (i) { - var source = args[i], sad = new SingleAssignmentDisposable(); - - isPromise(source) && (source = observableFromPromise(source)); - - sad.setDisposable(source.subscribe(function (x) { - queues[i].push(x); - if (queues.every(function (x) { return x.length > 0; })) { - var queuedValues = queues.map(function (x) { return x.shift(); }), - res = tryCatch(resultSelector).apply(parent, queuedValues); - if (res === errorObj) { return o.onError(res.e); } - o.onNext(res); - } else if (isDone.filter(function (x, j) { return j !== i; }).every(identity)) { - o.onCompleted(); - } - }, function (e) { o.onError(e); }, function () { - isDone[i] = true; - isDone.every(identity) && o.onCompleted(); - })); - subscriptions[i] = sad; - })(idx); - } - return new CompositeDisposable(subscriptions); - }, parent); + return new ZipObservable(args, resultSelector); }; /** @@ -3511,6 +3648,78 @@ function argumentsToArray() { return args; } +var ZipIterableObservable = (function(__super__) { + inherits(ZipIterableObservable, __super__); + function ZipIterableObservable(sources, cb) { + this.sources = sources; + this._cb = cb; + __super__.call(this); + } + + ZipIterableObservable.prototype.subscribeCore = function (o) { + var sources = this.sources, len = sources.length, subscriptions = new Array(len); + + var state = { + q: arrayInitialize(len, emptyArrayFactory), + done: arrayInitialize(len, falseFactory), + cb: this._cb, + o: o + }; + + for (var i = 0; i < len; i++) { + (function (i) { + var source = sources[i], sad = new SingleAssignmentDisposable(); + (isArrayLike(source) || isIterable(source)) && (source = observableFrom(source)); + + subscriptions[i] = sad; + sad.setDisposable(source.subscribe(new ZipIterableObserver(state, i))); + }(i)); + } + + return new NAryDisposable(subscriptions); + }; + + return ZipIterableObservable; +}(ObservableBase)); + +var ZipIterableObserver = (function (__super__) { + inherits(ZipIterableObserver, __super__); + function ZipIterableObserver(s, i) { + this._s = s; + this._i = i; + __super__.call(this); + } + + function notEmpty(x) { return x.length > 0; } + function shiftEach(x) { return x.shift(); } + function notTheSame(i) { + return function (x, j) { + return j !== i; + }; + } + + ZipIterableObserver.prototype.next = function (x) { + this._s.q[this._i].push(x); + if (this._s.q.every(notEmpty)) { + var queuedValues = this._s.q.map(shiftEach), + res = tryCatch(this._s.cb).apply(null, queuedValues); + if (res === errorObj) { return this._s.o.onError(res.e); } + this._s.o.onNext(res); + } else if (this._s.done.filter(notTheSame(this._i)).every(identity)) { + this._s.o.onCompleted(); + } + }; + + ZipIterableObserver.prototype.error = function (e) { this._s.o.onError(e); }; + + ZipIterableObserver.prototype.completed = function () { + this._s.done[this._i] = true; + this._s.done.every(identity) && this._s.o.onCompleted(); + }; + + return ZipIterableObserver; +}(AbstractObserver)); + /** * Merges the specified observable sequences into one observable sequence by using the selector function whenever all of the observable sequences or an array have produced an element at a corresponding index. * The last element in the arguments must be a function to invoke for each series of elements at corresponding indexes in the args. @@ -3525,38 +3734,7 @@ observableProto.zipIterable = function () { var parent = this; args.unshift(parent); - return new AnonymousObservable(function (o) { - var n = args.length, - queues = arrayInitialize(n, emptyArrayFactory), - isDone = arrayInitialize(n, falseFactory); - - var subscriptions = new Array(n); - for (var idx = 0; idx < n; idx++) { - (function (i) { - var source = args[i], sad = new SingleAssignmentDisposable(); - - (isArrayLike(source) || isIterable(source)) && (source = observableFrom(source)); - - sad.setDisposable(source.subscribe(function (x) { - queues[i].push(x); - if (queues.every(function (x) { return x.length > 0; })) { - var queuedValues = queues.map(function (x) { return x.shift(); }), - res = tryCatch(resultSelector).apply(parent, queuedValues); - if (res === errorObj) { return o.onError(res.e); } - o.onNext(res); - } else if (isDone.filter(function (x, j) { return j !== i; }).every(identity)) { - o.onCompleted(); - } - }, function (e) { o.onError(e); }, function () { - isDone[i] = true; - isDone.every(identity) && o.onCompleted(); - })); - subscriptions[i] = sad; - })(idx); - } - - return new CompositeDisposable(subscriptions); - }, parent); + return new ZipIterableObservable(args, resultSelector); }; function asObservable(source) { @@ -3571,15 +3749,41 @@ observableProto.zipIterable = function () { return new AnonymousObservable(asObservable(this), this); }; + var DematerializeObservable = (function (__super__) { + inherits(DematerializeObservable, __super__); + function DematerializeObservable(source) { + this.source = source; + __super__.call(this); + } + + DematerializeObservable.prototype.subscribeCore = function (o) { + return this.source.subscribe(new DematerializeObserver(o)); + }; + + return DematerializeObservable; + }(ObservableBase)); + + var DematerializeObserver = (function (__super__) { + inherits(DematerializeObserver, __super__); + + function DematerializeObserver(o) { + this._o = o; + __super__.call(this); + } + + DematerializeObserver.prototype.next = function (x) { x.accept(this._o); }; + DematerializeObserver.prototype.error = function (e) { this._o.onError(e); }; + DematerializeObserver.prototype.completed = function () { this._o.onCompleted(); }; + + return DematerializeObserver; + }(AbstractObserver)); + /** * Dematerializes the explicit notification values of an observable sequence as implicit notifications. * @returns {Observable} An observable sequence exhibiting the behavior corresponding to the source sequence's notification values. */ observableProto.dematerialize = function () { - var source = this; - return new AnonymousObservable(function (o) { - return source.subscribe(function (x) { return x.accept(o); }, function(e) { o.onError(e); }, function () { o.onCompleted(); }); - }, this); + return new DematerializeObservable(this); }; var DistinctUntilChangedObservable = (function(__super__) { @@ -3660,43 +3864,29 @@ observableProto.zipIterable = function () { return this.source.subscribe(new InnerObserver(o, this)); }; + inherits(InnerObserver, AbstractObserver); function InnerObserver(o, p) { this.o = o; this.t = !p._oN || isFunction(p._oN) ? observerCreate(p._oN || noop, p._oE || noop, p._oC || noop) : p._oN; this.isStopped = false; + AbstractObserver.call(this); } - InnerObserver.prototype.onNext = function(x) { - if (this.isStopped) { return; } + InnerObserver.prototype.next = function(x) { var res = tryCatch(this.t.onNext).call(this.t, x); if (res === errorObj) { this.o.onError(res.e); } this.o.onNext(x); }; - InnerObserver.prototype.onError = function(err) { - if (!this.isStopped) { - this.isStopped = true; - var res = tryCatch(this.t.onError).call(this.t, err); - if (res === errorObj) { return this.o.onError(res.e); } - this.o.onError(err); - } - }; - InnerObserver.prototype.onCompleted = function() { - if (!this.isStopped) { - this.isStopped = true; - var res = tryCatch(this.t.onCompleted).call(this.t); - if (res === errorObj) { return this.o.onError(res.e); } - this.o.onCompleted(); - } + InnerObserver.prototype.error = function(err) { + var res = tryCatch(this.t.onError).call(this.t, err); + if (res === errorObj) { return this.o.onError(res.e); } + this.o.onError(err); }; - InnerObserver.prototype.dispose = function() { this.isStopped = true; }; - InnerObserver.prototype.fail = function (e) { - if (!this.isStopped) { - this.isStopped = true; - this.o.onError(e); - return true; - } - return false; + InnerObserver.prototype.completed = function() { + var res = tryCatch(this.t.onCompleted).call(this.t); + if (res === errorObj) { return this.o.onError(res.e); } + this.o.onCompleted(); }; return TapObservable; @@ -3747,25 +3937,48 @@ observableProto.zipIterable = function () { return this.tap(noop, null, typeof thisArg !== 'undefined' ? function () { onCompleted.call(thisArg); } : onCompleted); }; + var FinallyObservable = (function (__super__) { + inherits(FinallyObservable, __super__); + function FinallyObservable(source, fn, thisArg) { + this.source = source; + this._fn = bindCallback(fn, thisArg, 0); + __super__.call(this); + } + + FinallyObservable.prototype.subscribeCore = function (o) { + var d = tryCatch(this.source.subscribe).call(this.source, o); + if (d === errorObj) { + this._fn(); + thrower(d.e); + } + + return new FinallyDisposable(d, this._fn); + }; + + function FinallyDisposable(s, fn) { + this.isDisposed = false; + this._s = s; + this._fn = fn; + } + FinallyDisposable.prototype.dispose = function () { + if (!this.isDisposed) { + var res = tryCatch(this._s.dispose).call(this._s); + this._fn(); + res === errorObj && thrower(res.e); + } + }; + + return FinallyObservable; + + }(ObservableBase)); + /** * Invokes a specified action after the source observable sequence terminates gracefully or exceptionally. * @param {Function} finallyAction Action to invoke after the source observable sequence terminates. * @returns {Observable} Source sequence with the action-invoking termination behavior applied. */ - observableProto['finally'] = function (action) { - var source = this; - return new AnonymousObservable(function (observer) { - var subscription = tryCatch(source.subscribe).call(source, observer); - if (subscription === errorObj) { - action(); - return thrower(subscription.e); - } - return disposableCreate(function () { - var r = tryCatch(subscription.dispose).call(subscription); - action(); - r === errorObj && thrower(r.e); - }); - }, this); + observableProto['finally'] = function (action, thisArg) { + return new FinallyObservable(this, action, thisArg); }; var IgnoreElementsObservable = (function(__super__) { @@ -3819,23 +4032,41 @@ observableProto.zipIterable = function () { return new IgnoreElementsObservable(this); }; + var MaterializeObservable = (function (__super__) { + inherits(MaterializeObservable, __super__); + function MaterializeObservable(source, fn) { + this.source = source; + __super__.call(this); + } + + MaterializeObservable.prototype.subscribeCore = function (o) { + return this.source.subscribe(new MaterializeObserver(o)); + }; + + return MaterializeObservable; + }(ObservableBase)); + + var MaterializeObserver = (function (__super__) { + inherits(MaterializeObserver, __super__); + + function MaterializeObserver(o) { + this._o = o; + __super__.call(this); + } + + MaterializeObserver.prototype.next = function (x) { this._o.onNext(notificationCreateOnNext(x)) }; + MaterializeObserver.prototype.error = function (e) { this._o.onNext(notificationCreateOnError(e)); this._o.onCompleted(); }; + MaterializeObserver.prototype.completed = function () { this._o.onNext(notificationCreateOnCompleted()); this._o.onCompleted(); }; + + return MaterializeObserver; + }(AbstractObserver)); + /** * Materializes the implicit notifications of an observable sequence as explicit notification values. * @returns {Observable} An observable sequence containing the materialized notification values from the source sequence. */ observableProto.materialize = function () { - var source = this; - return new AnonymousObservable(function (observer) { - return source.subscribe(function (value) { - observer.onNext(notificationCreateOnNext(value)); - }, function (e) { - observer.onNext(notificationCreateOnError(e)); - observer.onCompleted(); - }, function () { - observer.onNext(notificationCreateOnCompleted()); - observer.onCompleted(); - }); - }, source); + return new MaterializeObservable(this); }; /** @@ -3861,19 +4092,184 @@ observableProto.zipIterable = function () { return enumerableRepeat(this, retryCount).catchError(); }; - /** - * Repeats the source observable sequence upon error each time the notifier emits or until it successfully terminates. - * if the notifier completes, the observable sequence completes. - * - * @example - * var timer = Observable.timer(500); - * var source = observable.retryWhen(timer); - * @param {Observable} [notifier] An observable that triggers the retries or completes the observable with onNext or onCompleted respectively. - * @returns {Observable} An observable sequence producing the elements of the given sequence repeatedly until it terminates successfully. - */ + function repeat(value) { + return { + '@@iterator': function () { + return { + next: function () { + return { done: false, value: value }; + } + }; + } + }; + } + + var RetryWhenObservable = (function(__super__) { + function createDisposable(state) { + return { + isDisposed: false, + dispose: function () { + if (!this.isDisposed) { + this.isDisposed = true; + state.isDisposed = true; + } + } + }; + } + + function RetryWhenObservable(source, notifier) { + this.source = source; + this._notifier = notifier; + __super__.call(this); + } + + inherits(RetryWhenObservable, __super__); + + RetryWhenObservable.prototype.subscribeCore = function (o) { + var exceptions = new Subject(), + notifier = new Subject(), + handled = this._notifier(exceptions), + notificationDisposable = handled.subscribe(notifier); + + var e = this.source['@@iterator'](); + + var state = { isDisposed: false }, + lastError, + subscription = new SerialDisposable(); + var cancelable = currentThreadScheduler.scheduleRecursive(null, function (_, recurse) { + if (state.isDisposed) { return; } + var currentItem = e.next(); + + if (currentItem.done) { + if (lastError) { + o.onError(lastError); + } else { + o.onCompleted(); + } + return; + } + + // Check if promise + var currentValue = currentItem.value; + isPromise(currentValue) && (currentValue = observableFromPromise(currentValue)); + + var outer = new SingleAssignmentDisposable(); + var inner = new SingleAssignmentDisposable(); + subscription.setDisposable(new BinaryDisposable(inner, outer)); + outer.setDisposable(currentValue.subscribe( + function(x) { o.onNext(x); }, + function (exn) { + inner.setDisposable(notifier.subscribe(recurse, function(ex) { + o.onError(ex); + }, function() { + o.onCompleted(); + })); + + exceptions.onNext(exn); + outer.dispose(); + }, + function() { o.onCompleted(); })); + }); + + return new NAryDisposable([notificationDisposable, subscription, cancelable, createDisposable(state)]); + }; + + return RetryWhenObservable; + }(ObservableBase)); + observableProto.retryWhen = function (notifier) { - return enumerableRepeat(this).catchErrorWhen(notifier); + return new RetryWhenObservable(repeat(this), notifier); + }; + + function repeat(value) { + return { + '@@iterator': function () { + return { + next: function () { + return { done: false, value: value }; + } + }; + } + }; + } + + var RepeatWhenObservable = (function(__super__) { + function createDisposable(state) { + return { + isDisposed: false, + dispose: function () { + if (!this.isDisposed) { + this.isDisposed = true; + state.isDisposed = true; + } + } + }; + } + + function RepeatWhenObservable(source, notifier) { + this.source = source; + this._notifier = notifier; + __super__.call(this); + } + + inherits(RepeatWhenObservable, __super__); + + RepeatWhenObservable.prototype.subscribeCore = function (o) { + var completions = new Subject(), + notifier = new Subject(), + handled = this._notifier(completions), + notificationDisposable = handled.subscribe(notifier); + + var e = this.source['@@iterator'](); + + var state = { isDisposed: false }, + lastError, + subscription = new SerialDisposable(); + var cancelable = currentThreadScheduler.scheduleRecursive(null, function (_, recurse) { + if (state.isDisposed) { return; } + var currentItem = e.next(); + + if (currentItem.done) { + if (lastError) { + o.onError(lastError); + } else { + o.onCompleted(); + } + return; + } + + // Check if promise + var currentValue = currentItem.value; + isPromise(currentValue) && (currentValue = observableFromPromise(currentValue)); + + var outer = new SingleAssignmentDisposable(); + var inner = new SingleAssignmentDisposable(); + subscription.setDisposable(new BinaryDisposable(inner, outer)); + outer.setDisposable(currentValue.subscribe( + function(x) { o.onNext(x); }, + function (exn) { o.onError(exn); }, + function() { + inner.setDisposable(notifier.subscribe(recurse, function(ex) { + o.onError(ex); + }, function() { + o.onCompleted(); + })); + + completions.onNext(null); + outer.dispose(); + })); + }); + + return new NAryDisposable([notificationDisposable, subscription, cancelable, createDisposable(state)]); + }; + + return RepeatWhenObservable; + }(ObservableBase)); + + observableProto.repeatWhen = function (notifier) { + return new RepeatWhenObservable(repeat(this), notifier); }; + var ScanObservable = (function(__super__) { inherits(ScanObservable, __super__); function ScanObservable(source, accumulator, hasSeed, seed) { @@ -3885,58 +4281,51 @@ observableProto.zipIterable = function () { } ScanObservable.prototype.subscribeCore = function(o) { - return this.source.subscribe(new InnerObserver(o,this)); + return this.source.subscribe(new ScanObserver(o,this)); }; return ScanObservable; }(ObservableBase)); - function InnerObserver(o, parent) { - this.o = o; - this.accumulator = parent.accumulator; - this.hasSeed = parent.hasSeed; - this.seed = parent.seed; - this.hasAccumulation = false; - this.accumulation = null; - this.hasValue = false; - this.isStopped = false; - } - InnerObserver.prototype = { - onNext: function (x) { - if (this.isStopped) { return; } - !this.hasValue && (this.hasValue = true); - if (this.hasAccumulation) { - this.accumulation = tryCatch(this.accumulator)(this.accumulation, x); + var ScanObserver = (function (__super__) { + inherits(ScanObserver, __super__); + function ScanObserver(o, parent) { + this._o = o; + this._p = parent; + this._fn = parent.accumulator; + this._hs = parent.hasSeed; + this._s = parent.seed; + this._ha = false; + this._a = null; + this._hv = false; + this._i = 0; + __super__.call(this); + } + + ScanObserver.prototype.next = function (x) { + !this._hv && (this._hv = true); + if (this._ha) { + this._a = tryCatch(this._fn)(this._a, x, this._i, this._p); } else { - this.accumulation = this.hasSeed ? tryCatch(this.accumulator)(this.seed, x) : x; - this.hasAccumulation = true; - } - if (this.accumulation === errorObj) { return this.o.onError(this.accumulation.e); } - this.o.onNext(this.accumulation); - }, - onError: function (e) { - if (!this.isStopped) { - this.isStopped = true; - this.o.onError(e); + this._a = this._hs ? tryCatch(this._fn)(this._s, x, this._i, this._p) : x; + this._ha = true; } - }, - onCompleted: function () { - if (!this.isStopped) { - this.isStopped = true; - !this.hasValue && this.hasSeed && this.o.onNext(this.seed); - this.o.onCompleted(); - } - }, - dispose: function() { this.isStopped = true; }, - fail: function (e) { - if (!this.isStopped) { - this.isStopped = true; - this.o.onError(e); - return true; - } - return false; - } - }; + if (this._a === errorObj) { return this._o.onError(this._a.e); } + this._o.onNext(this._a); + this._i++; + }; + + ScanObserver.prototype.error = function (e) { + this._o.onError(e); + }; + + ScanObserver.prototype.completed = function () { + !this._hv && this._hs && this._o.onNext(this._s); + this._o.onCompleted(); + }; + + return ScanObserver; + }(AbstractObserver)); /** * Applies an accumulator function over an observable sequence and returns each intermediate result. The optional seed value is used as the initial accumulator value. @@ -3954,6 +4343,46 @@ observableProto.zipIterable = function () { return new ScanObservable(this, accumulator, hasSeed, seed); }; + var SkipLastObservable = (function (__super__) { + inherits(SkipLastObservable, __super__); + function SkipLastObservable(source, c) { + this.source = source; + this._c = c; + __super__.call(this); + } + + SkipLastObservable.prototype.subscribeCore = function (o) { + return this.source.subscribe(new SkipLastObserver(o, this._c)); + }; + + return SkipLastObservable; + }(ObservableBase)); + + var SkipLastObserver = (function (__super__) { + inherits(SkipLastObserver, __super__); + function SkipLastObserver(o, c) { + this._o = o; + this._c = c; + this._q = []; + __super__.call(this); + } + + SkipLastObserver.prototype.next = function (x) { + this._q.push(x); + this._q.length > this._c && this._o.onNext(this._q.shift()); + }; + + SkipLastObserver.prototype.error = function (e) { + this._o.onError(e); + }; + + SkipLastObserver.prototype.completed = function () { + this._o.onCompleted(); + }; + + return SkipLastObserver; + }(AbstractObserver)); + /** * Bypasses a specified number of elements at the end of an observable sequence. * @description @@ -3964,14 +4393,7 @@ observableProto.zipIterable = function () { */ observableProto.skipLast = function (count) { if (count < 0) { throw new ArgumentOutOfRangeError(); } - var source = this; - return new AnonymousObservable(function (o) { - var q = []; - return source.subscribe(function (x) { - q.push(x); - q.length > count && o.onNext(q.shift()); - }, function (e) { o.onError(e); }, function () { o.onCompleted(); }); - }, source); + return new SkipLastObservable(this, count); }; /** @@ -3994,6 +4416,32 @@ observableProto.zipIterable = function () { return enumerableOf([observableFromArray(args, scheduler), this]).concat(); }; + var TakeLastObserver = (function (__super__) { + inherits(TakeLastObserver, __super__); + function TakeLastObserver(o, c) { + this._o = o; + this._c = c; + this._q = []; + __super__.call(this); + } + + TakeLastObserver.prototype.next = function (x) { + this._q.push(x); + this._q.length > this._c && this._q.shift(); + }; + + TakeLastObserver.prototype.error = function (e) { + this._o.onError(e); + }; + + TakeLastObserver.prototype.completed = function () { + while (this._q.length > 0) { this._o.onNext(this._q.shift()); } + this._o.onCompleted(); + }; + + return TakeLastObserver; + }(AbstractObserver)); + /** * Returns a specified number of contiguous elements from the end of an observable sequence. * @description @@ -4006,14 +4454,7 @@ observableProto.zipIterable = function () { if (count < 0) { throw new ArgumentOutOfRangeError(); } var source = this; return new AnonymousObservable(function (o) { - var q = []; - return source.subscribe(function (x) { - q.push(x); - q.length > count && q.shift(); - }, function (e) { o.onError(e); }, function () { - while (q.length > 0) { o.onNext(q.shift()); } - o.onCompleted(); - }); + return source.subscribe(new TakeLastObserver(o, count)); }, source); }; @@ -4030,7 +4471,7 @@ observableProto.flatMapConcat = observableProto.concatMap = function(selector, r } function innerMap(selector, self) { - return function (x, i, o) { return selector.call(this, self.selector(x, i, o), i, o); } + return function (x, i, o) { return selector.call(this, self.selector(x, i, o), i, o); }; } MapObservable.prototype.internalMap = function (selector, thisArg) { @@ -4041,35 +4482,27 @@ observableProto.flatMapConcat = observableProto.concatMap = function(selector, r return this.source.subscribe(new InnerObserver(o, this.selector, this)); }; + inherits(InnerObserver, AbstractObserver); function InnerObserver(o, selector, source) { this.o = o; this.selector = selector; this.source = source; this.i = 0; - this.isStopped = false; + AbstractObserver.call(this); } - InnerObserver.prototype.onNext = function(x) { - if (this.isStopped) { return; } + InnerObserver.prototype.next = function(x) { var result = tryCatch(this.selector)(x, this.i++, this.source); if (result === errorObj) { return this.o.onError(result.e); } this.o.onNext(result); }; - InnerObserver.prototype.onError = function (e) { - if(!this.isStopped) { this.isStopped = true; this.o.onError(e); } - }; - InnerObserver.prototype.onCompleted = function () { - if(!this.isStopped) { this.isStopped = true; this.o.onCompleted(); } + + InnerObserver.prototype.error = function (e) { + this.o.onError(e); }; - InnerObserver.prototype.dispose = function() { this.isStopped = true; }; - InnerObserver.prototype.fail = function (e) { - if (!this.isStopped) { - this.isStopped = true; - this.o.onError(e); - return true; - } - return false; + InnerObserver.prototype.completed = function () { + this.o.onCompleted(); }; return MapObservable; @@ -4101,7 +4534,7 @@ observableProto.flatMapConcat = observableProto.concatMap = function(selector, r } } return currentProp; - } + }; } /** @@ -4121,13 +4554,6 @@ observableProto.flatMap = observableProto.selectMany = function(selector, result return new FlatMapObservable(this, selector, resultSelector, thisArg).mergeAll(); }; - -// -//Rx.Observable.prototype.flatMapWithMaxConcurrent = function(limit, selector, resultSelector, thisArg) { -// return new FlatMapObservable(this, selector, resultSelector, thisArg).merge(limit); -//}; -// - Rx.Observable.prototype.flatMapLatest = function(selector, resultSelector, thisArg) { return new FlatMapObservable(this, selector, resultSelector, thisArg).switchLatest(); }; @@ -4135,47 +4561,35 @@ Rx.Observable.prototype.flatMapLatest = function(selector, resultSelector, thisA inherits(SkipObservable, __super__); function SkipObservable(source, count) { this.source = source; - this.skipCount = count; + this._count = count; __super__.call(this); } - + SkipObservable.prototype.subscribeCore = function (o) { - return this.source.subscribe(new InnerObserver(o, this.skipCount)); + return this.source.subscribe(new SkipObserver(o, this._count)); }; - - function InnerObserver(o, c) { - this.c = c; - this.r = c; - this.o = o; - this.isStopped = false; + + function SkipObserver(o, c) { + this._o = o; + this._r = c; + AbstractObserver.call(this); } - InnerObserver.prototype.onNext = function (x) { - if (this.isStopped) { return; } - if (this.r <= 0) { - this.o.onNext(x); + + inherits(SkipObserver, AbstractObserver); + + SkipObserver.prototype.next = function (x) { + if (this._r <= 0) { + this._o.onNext(x); } else { - this.r--; - } - }; - InnerObserver.prototype.onError = function(e) { - if (!this.isStopped) { this.isStopped = true; this.o.onError(e); } - }; - InnerObserver.prototype.onCompleted = function() { - if (!this.isStopped) { this.isStopped = true; this.o.onCompleted(); } - }; - InnerObserver.prototype.dispose = function() { this.isStopped = true; }; - InnerObserver.prototype.fail = function(e) { - if (!this.isStopped) { - this.isStopped = true; - this.o.onError(e); - return true; + this._r--; } - return false; }; - + SkipObserver.prototype.error = function(e) { this._o.onError(e); }; + SkipObserver.prototype.completed = function() { this._o.onCompleted(); }; + return SkipObservable; - }(ObservableBase)); - + }(ObservableBase)); + /** * Bypasses a specified number of elements in an observable sequence and then returns the remaining elements. * @param {Number} count The number of elements to skip before returning the remaining elements. @@ -4185,6 +4599,47 @@ Rx.Observable.prototype.flatMapLatest = function(selector, resultSelector, thisA if (count < 0) { throw new ArgumentOutOfRangeError(); } return new SkipObservable(this, count); }; + + var SkipWhileObservable = (function (__super__) { + inherits(SkipWhileObservable, __super__); + function SkipWhileObservable(source, fn) { + this.source = source; + this._fn = fn; + __super__.call(this); + } + + SkipWhileObservable.prototype.subscribeCore = function (o) { + return this.source.subscribe(new SkipWhileObserver(o, this)); + }; + + return SkipWhileObservable; + }(ObservableBase)); + + var SkipWhileObserver = (function (__super__) { + inherits(SkipWhileObserver, __super__); + + function SkipWhileObserver(o, p) { + this._o = o; + this._p = p; + this._i = 0; + this._r = false; + __super__.call(this); + } + + SkipWhileObserver.prototype.next = function (x) { + if (!this._r) { + var res = tryCatch(this._p._fn)(x, this._i++, this._p); + if (res === errorObj) { return this._o.onError(res.e); } + this._r = !res; + } + this._r && this._o.onNext(x); + }; + SkipWhileObserver.prototype.error = function (e) { this._o.onError(e); }; + SkipWhileObserver.prototype.completed = function () { this._o.onCompleted(); }; + + return SkipWhileObserver; + }(AbstractObserver)); + /** * Bypasses elements in an observable sequence as long as a specified condition is true and then returns the remaining elements. * The element's index is used in the logic of the predicate function. @@ -4196,29 +4651,46 @@ Rx.Observable.prototype.flatMapLatest = function(selector, resultSelector, thisA * @returns {Observable} An observable sequence that contains the elements from the input sequence starting at the first element in the linear series that does not pass the test specified by predicate. */ observableProto.skipWhile = function (predicate, thisArg) { - var source = this, - callback = bindCallback(predicate, thisArg, 3); - return new AnonymousObservable(function (o) { - var i = 0, running = false; - return source.subscribe(function (x) { - if (!running) { - try { - running = !callback(x, i++, source); - } catch (e) { - o.onError(e); - return; - } - } - running && o.onNext(x); - }, function (e) { o.onError(e); }, function () { o.onCompleted(); }); - }, source); + var fn = bindCallback(predicate, thisArg, 3); + return new SkipWhileObservable(this, fn); }; + var TakeObservable = (function(__super__) { + inherits(TakeObservable, __super__); + function TakeObservable(source, count) { + this.source = source; + this._count = count; + __super__.call(this); + } + + TakeObservable.prototype.subscribeCore = function (o) { + return this.source.subscribe(new TakeObserver(o, this._count)); + }; + + function TakeObserver(o, c) { + this._o = o; + this._c = c; + this._r = c; + AbstractObserver.call(this); + } + + inherits(TakeObserver, AbstractObserver); + + TakeObserver.prototype.next = function (x) { + if (this._r-- > 0) { + this._o.onNext(x); + this._r <= 0 && this._o.onCompleted(); + } + }; + + TakeObserver.prototype.error = function (e) { this._o.onError(e); }; + TakeObserver.prototype.completed = function () { this._o.onCompleted(); }; + + return TakeObservable; + }(ObservableBase)); + /** * Returns a specified number of contiguous elements from the start of an observable sequence, using the specified scheduler for the edge case of take(0). - * - * var res = source.take(5); - * var res = source.take(0, Rx.Scheduler.timeout); * @param {Number} count The number of elements to return. * @param {Scheduler} [scheduler] Scheduler used to produce an OnCompleted message in case <paramref name="count count</paramref> is set to 0. * @returns {Observable} An observable sequence that contains the specified number of elements from the start of the input sequence. @@ -4226,18 +4698,52 @@ Rx.Observable.prototype.flatMapLatest = function(selector, resultSelector, thisA observableProto.take = function (count, scheduler) { if (count < 0) { throw new ArgumentOutOfRangeError(); } if (count === 0) { return observableEmpty(scheduler); } - var source = this; - return new AnonymousObservable(function (o) { - var remaining = count; - return source.subscribe(function (x) { - if (remaining-- > 0) { - o.onNext(x); - remaining <= 0 && o.onCompleted(); - } - }, function (e) { o.onError(e); }, function () { o.onCompleted(); }); - }, source); + return new TakeObservable(this, count); }; + var TakeWhileObservable = (function (__super__) { + inherits(TakeWhileObservable, __super__); + function TakeWhileObservable(source, fn) { + this.source = source; + this._fn = fn; + __super__.call(this); + } + + TakeWhileObservable.prototype.subscribeCore = function (o) { + return this.source.subscribe(new TakeWhileObserver(o, this)); + }; + + return TakeWhileObservable; + }(ObservableBase)); + + var TakeWhileObserver = (function (__super__) { + inherits(TakeWhileObserver, __super__); + + function TakeWhileObserver(o, p) { + this._o = o; + this._p = p; + this._i = 0; + this._r = true; + __super__.call(this); + } + + TakeWhileObserver.prototype.next = function (x) { + if (this._r) { + this._r = tryCatch(this._p._fn)(x, this._i++, this._p); + if (this._r === errorObj) { return this._o.onError(this._r.e); } + } + if (this._r) { + this._o.onNext(x); + } else { + this._o.onCompleted(); + } + }; + TakeWhileObserver.prototype.error = function (e) { this._o.onError(e); }; + TakeWhileObserver.prototype.completed = function () { this._o.onCompleted(); }; + + return TakeWhileObserver; + }(AbstractObserver)); + /** * Returns elements from an observable sequence as long as a specified condition is true. * The element's index is used in the logic of the predicate function. @@ -4246,26 +4752,8 @@ Rx.Observable.prototype.flatMapLatest = function(selector, resultSelector, thisA * @returns {Observable} An observable sequence that contains the elements from the input sequence that occur before the element at which the test no longer passes. */ observableProto.takeWhile = function (predicate, thisArg) { - var source = this, - callback = bindCallback(predicate, thisArg, 3); - return new AnonymousObservable(function (o) { - var i = 0, running = true; - return source.subscribe(function (x) { - if (running) { - try { - running = callback(x, i++, source); - } catch (e) { - o.onError(e); - return; - } - if (running) { - o.onNext(x); - } else { - o.onCompleted(); - } - } - }, function (e) { o.onError(e); }, function () { o.onCompleted(); }); - }, source); + var fn = bindCallback(predicate, thisArg, 3); + return new TakeWhileObservable(this, fn); }; var FilterObservable = (function (__super__) { @@ -4280,7 +4768,7 @@ Rx.Observable.prototype.flatMapLatest = function(selector, resultSelector, thisA FilterObservable.prototype.subscribeCore = function (o) { return this.source.subscribe(new InnerObserver(o, this.predicate, this)); }; - + function innerPredicate(predicate, self) { return function(x, i, o) { return self.predicate(x, i, o) && predicate.call(this, x, i, o); } } @@ -4288,37 +4776,30 @@ Rx.Observable.prototype.flatMapLatest = function(selector, resultSelector, thisA FilterObservable.prototype.internalFilter = function(predicate, thisArg) { return new FilterObservable(this.source, innerPredicate(predicate, this), thisArg); }; - + + inherits(InnerObserver, AbstractObserver); function InnerObserver(o, predicate, source) { this.o = o; this.predicate = predicate; this.source = source; this.i = 0; - this.isStopped = false; + AbstractObserver.call(this); } - - InnerObserver.prototype.onNext = function(x) { - if (this.isStopped) { return; } + + InnerObserver.prototype.next = function(x) { var shouldYield = tryCatch(this.predicate)(x, this.i++, this.source); if (shouldYield === errorObj) { return this.o.onError(shouldYield.e); } shouldYield && this.o.onNext(x); }; - InnerObserver.prototype.onError = function (e) { - if(!this.isStopped) { this.isStopped = true; this.o.onError(e); } - }; - InnerObserver.prototype.onCompleted = function () { - if(!this.isStopped) { this.isStopped = true; this.o.onCompleted(); } + + InnerObserver.prototype.error = function (e) { + this.o.onError(e); }; - InnerObserver.prototype.dispose = function() { this.isStopped = true; }; - InnerObserver.prototype.fail = function (e) { - if (!this.isStopped) { - this.isStopped = true; - this.o.onError(e); - return true; - } - return false; + + InnerObserver.prototype.completed = function () { + this.o.onCompleted(); }; return FilterObservable; @@ -4433,6 +4914,16 @@ Observable.fromNodeCallback = function (fn, ctx, selector) { }; }; + function isNodeList(el) { + if (root.StaticNodeList) { + // IE8 Specific + // instanceof is slower than Object#toString, but Object#toString will not work as intended in IE8 + return el instanceof root.StaticNodeList || el instanceof root.NodeList; + } else { + return Object.prototype.toString.call(el) === '[object NodeList]'; + } + } + function ListenDisposable(e, n, fn) { this._e = e; this._n = n; @@ -4452,7 +4943,7 @@ Observable.fromNodeCallback = function (fn, ctx, selector) { // Asume NodeList or HTMLCollection var elemToString = Object.prototype.toString.call(el); - if (elemToString === '[object NodeList]' || elemToString === '[object HTMLCollection]') { + if (isNodeList(el) || elemToString === '[object HTMLCollection]') { for (var i = 0, len = el.length; i < len; i++) { disposables.add(createEventListener(el.item(i), eventName, handler)); } @@ -4468,16 +4959,35 @@ Observable.fromNodeCallback = function (fn, ctx, selector) { */ Rx.config.useNativeEvents = false; - function eventHandler(o, selector) { - return function handler () { - var results = arguments[0]; - if (isFunction(selector)) { - results = tryCatch(selector).apply(null, arguments); - if (results === errorObj) { return o.onError(results.e); } - } - o.onNext(results); + var EventObservable = (function(__super__) { + inherits(EventObservable, __super__); + function EventObservable(el, name, fn) { + this._el = el; + this._n = name; + this._fn = fn; + __super__.call(this); + } + + function createHandler(o, fn) { + return function handler () { + var results = arguments[0]; + if (isFunction(fn)) { + results = tryCatch(fn).apply(null, arguments); + if (results === errorObj) { return o.onError(results.e); } + } + o.onNext(results); + }; + } + + EventObservable.prototype.subscribeCore = function (o) { + return createEventListener( + this._el, + this._n, + createHandler(o, this._fn)); }; - } + + return EventObservable; + }(ObservableBase)); /** * Creates an observable sequence by adding an event listener to the matching DOMElement or each item in the NodeList. @@ -4506,54 +5016,93 @@ Observable.fromNodeCallback = function (fn, ctx, selector) { } } - return new AnonymousObservable(function (o) { - return createEventListener( - element, - eventName, - eventHandler(o, selector)); - }).publish().refCount(); + return new EventObservable(element, eventName, selector).publish().refCount(); }; + var EventPatternObservable = (function(__super__) { + inherits(EventPatternObservable, __super__); + function EventPatternObservable(add, del, fn) { + this._add = add; + this._del = del; + this._fn = fn; + __super__.call(this); + } + + function createHandler(o, fn) { + return function handler () { + var results = arguments[0]; + if (isFunction(fn)) { + results = tryCatch(fn).apply(null, arguments); + if (results === errorObj) { return o.onError(results.e); } + } + o.onNext(results); + }; + } + + EventPatternObservable.prototype.subscribeCore = function (o) { + var fn = createHandler(o, this._fn); + var returnValue = this._add(fn); + return new EventPatternDisposable(this._del, fn, returnValue); + }; + + function EventPatternDisposable(del, fn, ret) { + this._del = del; + this._fn = fn; + this._ret = ret; + this.isDisposed = false; + } + + EventPatternDisposable.prototype.dispose = function () { + if(!this.isDisposed) { + isFunction(this._del) && this._del(this._fn, this._ret); + this.isDisposed = true; + } + }; + + return EventPatternObservable; + }(ObservableBase)); + /** * Creates an observable sequence from an event emitter via an addHandler/removeHandler pair. * @param {Function} addHandler The function to add a handler to the emitter. * @param {Function} [removeHandler] The optional function to remove a handler from an emitter. * @param {Function} [selector] A selector which takes the arguments from the event handler to produce a single item to yield on next. - * @param {Scheduler} [scheduler] A scheduler used to schedule the remove handler. * @returns {Observable} An observable sequence which wraps an event from an event emitter */ - var fromEventPattern = Observable.fromEventPattern = function (addHandler, removeHandler, selector, scheduler) { - isScheduler(scheduler) || (scheduler = immediateScheduler); - return new AnonymousObservable(function (o) { - function innerHandler () { - var result = arguments[0]; - if (isFunction(selector)) { - result = tryCatch(selector).apply(null, arguments); - if (result === errorObj) { return o.onError(result.e); } - } - o.onNext(result); - } - - var returnValue = addHandler(innerHandler); - return disposableCreate(function () { - isFunction(removeHandler) && removeHandler(innerHandler, returnValue); - }); - }).publish().refCount(); + var fromEventPattern = Observable.fromEventPattern = function (addHandler, removeHandler, selector) { + return new EventPatternObservable(addHandler, removeHandler, selector).publish().refCount(); }; var FromPromiseObservable = (function(__super__) { inherits(FromPromiseObservable, __super__); - function FromPromiseObservable(p) { - this.p = p; + function FromPromiseObservable(p, s) { + this._p = p; + this._s = s; __super__.call(this); } + function scheduleNext(s, state) { + var o = state[0], data = state[1]; + o.onNext(data); + o.onCompleted(); + } + + function scheduleError(s, state) { + var o = state[0], err = state[1]; + o.onError(err); + } + FromPromiseObservable.prototype.subscribeCore = function(o) { - this.p.then(function (data) { - o.onNext(data); - o.onCompleted(); - }, function (err) { o.onError(err); }); - return disposableEmpty; + var sad = new SingleAssignmentDisposable(), self = this; + + this._p + .then(function (data) { + sad.setDisposable(self._s.schedule([o, data], scheduleNext)); + }, function (err) { + sad.setDisposable(self._s.schedule([o, err], scheduleError)); + }); + + return sad; }; return FromPromiseObservable; @@ -4564,9 +5113,11 @@ Observable.fromNodeCallback = function (fn, ctx, selector) { * @param {Promise} An ES6 Compliant promise. * @returns {Observable} An Observable sequence which wraps the existing promise success and failure. */ - var observableFromPromise = Observable.fromPromise = function (promise) { - return new FromPromiseObservable(promise); + var observableFromPromise = Observable.fromPromise = function (promise, scheduler) { + scheduler || (scheduler = defaultScheduler); + return new FromPromiseObservable(promise, scheduler); }; + /* * Converts an existing observable sequence to an ES6 Compatible Promise * @example @@ -4584,12 +5135,11 @@ Observable.fromNodeCallback = function (fn, ctx, selector) { var source = this; return new promiseCtor(function (resolve, reject) { // No cancellation can be done - var value, hasValue = false; + var value; source.subscribe(function (v) { value = v; - hasValue = true; }, reject, function () { - hasValue && resolve(value); + resolve(value); }); }); }; @@ -4600,14 +5150,27 @@ Observable.fromNodeCallback = function (fn, ctx, selector) { * @returns {Observable} An observable sequence exposing the function's result value, or an exception. */ Observable.startAsync = function (functionAsync) { - var promise; - try { - promise = functionAsync(); - } catch (e) { - return observableThrow(e); - } + var promise = tryCatch(functionAsync)(); + if (promise === errorObj) { return observableThrow(promise.e); } return observableFromPromise(promise); - } + }; + + var MulticastObservable = (function (__super__) { + inherits(MulticastObservable, __super__); + function MulticastObservable(source, fn1, fn2) { + this.source = source; + this._fn1 = fn1; + this._fn2 = fn2; + __super__.call(this); + } + + MulticastObservable.prototype.subscribeCore = function (o) { + var connectable = this.source.multicast(this._fn1()); + return new BinaryDisposable(this._fn2(connectable).subscribe(o), connectable.connect()); + }; + + return MulticastObservable; + }(ObservableBase)); /** * Multicasts the source sequence notifications through an instantiated subject into all uses of the sequence within a selector function. Each @@ -4627,13 +5190,9 @@ Observable.fromNodeCallback = function (fn, ctx, selector) { * @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source sequence within a selector function. */ observableProto.multicast = function (subjectOrSubjectSelector, selector) { - var source = this; - return typeof subjectOrSubjectSelector === 'function' ? - new AnonymousObservable(function (observer) { - var connectable = source.multicast(subjectOrSubjectSelector()); - return new CompositeDisposable(selector(connectable).subscribe(observer), connectable.connect()); - }, source) : - new ConnectableObservable(source, subjectOrSubjectSelector); + return isFunction(subjectOrSubjectSelector) ? + new MulticastObservable(this, subjectOrSubjectSelector, selector) : + new ConnectableObservable(this, subjectOrSubjectSelector); }; /** @@ -4750,72 +5309,115 @@ Observable.fromNodeCallback = function (fn, ctx, selector) { return this.replay(null, bufferSize, windowSize, scheduler).refCount(); }; + var RefCountObservable = (function (__super__) { + inherits(RefCountObservable, __super__); + function RefCountObservable(source) { + this.source = source; + this._count = 0; + this._connectableSubscription = null; + __super__.call(this); + } + + RefCountObservable.prototype.subscribeCore = function (o) { + var subscription = this.source.subscribe(o); + ++this._count === 1 && (this._connectableSubscription = this.source.connect()); + return new RefCountDisposable(this, subscription); + }; + + function RefCountDisposable(p, s) { + this._p = p; + this._s = s; + this.isDisposed = false; + } + + RefCountDisposable.prototype.dispose = function () { + if (!this.isDisposed) { + this.isDisposed = true; + this._s.dispose(); + --this._p._count === 0 && this._p._connectableSubscription.dispose(); + } + }; + + return RefCountObservable; + }(ObservableBase)); + var ConnectableObservable = Rx.ConnectableObservable = (function (__super__) { inherits(ConnectableObservable, __super__); - function ConnectableObservable(source, subject) { - var hasSubscription = false, - subscription, - sourceObservable = source.asObservable(); - - this.connect = function () { - if (!hasSubscription) { - hasSubscription = true; - subscription = new CompositeDisposable(sourceObservable.subscribe(subject), disposableCreate(function () { - hasSubscription = false; - })); - } - return subscription; - }; + this.source = source; + this._connection = null; + this._source = source.asObservable(); + this._subject = subject; + __super__.call(this); + } - __super__.call(this, function (o) { return subject.subscribe(o); }); + function ConnectDisposable(parent, subscription) { + this._p = parent; + this._s = subscription; } + ConnectDisposable.prototype.dispose = function () { + if (this._s) { + this._s.dispose(); + this._s = null; + this._p._connection = null; + } + }; + + ConnectableObservable.prototype.connect = function () { + if (!this._connection) { + var subscription = this._source.subscribe(this._subject); + this._connection = new ConnectDisposable(this, subscription); + } + return this._connection; + }; + + ConnectableObservable.prototype._subscribe = function (o) { + return this._subject.subscribe(o); + }; + ConnectableObservable.prototype.refCount = function () { - var connectableSubscription, count = 0, source = this; - return new AnonymousObservable(function (observer) { - var shouldConnect = ++count === 1, - subscription = source.subscribe(observer); - shouldConnect && (connectableSubscription = source.connect()); - return function () { - subscription.dispose(); - --count === 0 && connectableSubscription.dispose(); - }; - }); + return new RefCountObservable(this); }; return ConnectableObservable; }(Observable)); - function observableTimerDate(dueTime, scheduler) { - return new AnonymousObservable(function (observer) { - return scheduler.scheduleWithAbsolute(dueTime, function () { - observer.onNext(0); - observer.onCompleted(); - }); - }); + var TimerObservable = (function(__super__) { + inherits(TimerObservable, __super__); + function TimerObservable(dt, s) { + this._dt = dt; + this._s = s; + __super__.call(this); + } + + TimerObservable.prototype.subscribeCore = function (o) { + return this._s.scheduleFuture(o, this._dt, scheduleMethod); + }; + + function scheduleMethod(s, o) { + o.onNext(0); + o.onCompleted(); + } + + return TimerObservable; + }(ObservableBase)); + + function _observableTimer(dueTime, scheduler) { + return new TimerObservable(dueTime, scheduler); } function observableTimerDateAndPeriod(dueTime, period, scheduler) { return new AnonymousObservable(function (observer) { var d = dueTime, p = normalizeTime(period); - return scheduler.scheduleRecursiveWithAbsoluteAndState(0, d, function (count, self) { + return scheduler.scheduleRecursiveFuture(0, d, function (count, self) { if (p > 0) { var now = scheduler.now(); - d = d + p; - d <= now && (d = now + p); + d = new Date(d.getTime() + p); + d.getTime() <= now && (d = new Date(now + p)); } observer.onNext(count); - self(count + 1, d); - }); - }); - } - - function observableTimerTimeSpan(dueTime, scheduler) { - return new AnonymousObservable(function (observer) { - return scheduler.scheduleWithRelative(normalizeTime(dueTime), function () { - observer.onNext(0); - observer.onCompleted(); + self(count + 1, new Date(d)); }); }); } @@ -4823,13 +5425,13 @@ Observable.fromNodeCallback = function (fn, ctx, selector) { function observableTimerTimeSpanAndPeriod(dueTime, period, scheduler) { return dueTime === period ? new AnonymousObservable(function (observer) { - return scheduler.schedulePeriodicWithState(0, period, function (count) { + return scheduler.schedulePeriodic(0, period, function (count) { observer.onNext(count); return count + 1; }); }) : observableDefer(function () { - return observableTimerDateAndPeriod(scheduler.now() + dueTime, period, scheduler); + return observableTimerDateAndPeriod(new Date(scheduler.now() + dueTime), period, scheduler); }); } @@ -4845,7 +5447,7 @@ Observable.fromNodeCallback = function (fn, ctx, selector) { * @returns {Observable} An observable sequence that produces a value after each period. */ var observableinterval = Observable.interval = function (period, scheduler) { - return observableTimerTimeSpanAndPeriod(period, period, isScheduler(scheduler) ? scheduler : timeoutScheduler); + return observableTimerTimeSpanAndPeriod(period, period, isScheduler(scheduler) ? scheduler : defaultScheduler); }; /** @@ -4857,21 +5459,19 @@ Observable.fromNodeCallback = function (fn, ctx, selector) { */ var observableTimer = Observable.timer = function (dueTime, periodOrScheduler, scheduler) { var period; - isScheduler(scheduler) || (scheduler = timeoutScheduler); + isScheduler(scheduler) || (scheduler = defaultScheduler); if (periodOrScheduler != null && typeof periodOrScheduler === 'number') { period = periodOrScheduler; } else if (isScheduler(periodOrScheduler)) { scheduler = periodOrScheduler; } - if (dueTime instanceof Date && period === undefined) { - return observableTimerDate(dueTime.getTime(), scheduler); + if ((dueTime instanceof Date || typeof dueTime === 'number') && period === undefined) { + return _observableTimer(dueTime, scheduler); } if (dueTime instanceof Date && period !== undefined) { - return observableTimerDateAndPeriod(dueTime.getTime(), periodOrScheduler, scheduler); + return observableTimerDateAndPeriod(dueTime, periodOrScheduler, scheduler); } - return period === undefined ? - observableTimerTimeSpan(dueTime, scheduler) : - observableTimerTimeSpanAndPeriod(dueTime, period, scheduler); + return observableTimerTimeSpanAndPeriod(dueTime, period, scheduler); }; function observableDelayRelative(source, dueTime, scheduler) { @@ -4887,7 +5487,7 @@ Observable.fromNodeCallback = function (fn, ctx, selector) { if (notification.value.kind === 'E') { q = []; q.push(notification); - exception = notification.value.exception; + exception = notification.value.error; shouldRun = !running; } else { q.push({ value: notification.value, timestamp: notification.timestamp + dueTime }); @@ -4900,7 +5500,7 @@ Observable.fromNodeCallback = function (fn, ctx, selector) { } else { d = new SingleAssignmentDisposable(); cancelable.setDisposable(d); - d.setDisposable(scheduler.scheduleRecursiveWithRelative(dueTime, function (self) { + d.setDisposable(scheduler.scheduleRecursiveFuture(null, dueTime, function (_, self) { var e, recurseDueTime, result, shouldRecurse; if (exception !== null) { return; @@ -4928,13 +5528,13 @@ Observable.fromNodeCallback = function (fn, ctx, selector) { if (e !== null) { o.onError(e); } else if (shouldRecurse) { - self(recurseDueTime); + self(null, recurseDueTime); } })); } } }); - return new CompositeDisposable(subscription, cancelable); + return new BinaryDisposable(subscription, cancelable); }, source); } @@ -4995,8 +5595,8 @@ Observable.fromNodeCallback = function (fn, ctx, selector) { subscription.setDisposable(subDelay.subscribe(start, function (e) { o.onError(e); }, start)); } - return new CompositeDisposable(subscription, delays); - }, this); + return new BinaryDisposable(subscription, delays); + }, source); } /** @@ -5008,52 +5608,86 @@ Observable.fromNodeCallback = function (fn, ctx, selector) { * @returns {Observable} Time-shifted sequence. */ observableProto.delay = function () { - if (typeof arguments[0] === 'number' || arguments[0] instanceof Date) { - var dueTime = arguments[0], scheduler = arguments[1]; - isScheduler(scheduler) || (scheduler = timeoutScheduler); + var firstArg = arguments[0]; + if (typeof firstArg === 'number' || firstArg instanceof Date) { + var dueTime = firstArg, scheduler = arguments[1]; + isScheduler(scheduler) || (scheduler = defaultScheduler); return dueTime instanceof Date ? observableDelayAbsolute(this, dueTime, scheduler) : observableDelayRelative(this, dueTime, scheduler); - } else if (isFunction(arguments[0])) { - return delayWithSelector(this, arguments[0], arguments[1]); + } else if (Observable.isObservable(firstArg) || isFunction(firstArg)) { + return delayWithSelector(this, firstArg, arguments[1]); } else { throw new Error('Invalid arguments'); } }; - function debounce(source, dueTime, scheduler) { - isScheduler(scheduler) || (scheduler = timeoutScheduler); - return new AnonymousObservable(function (observer) { - var cancelable = new SerialDisposable(), hasvalue = false, value, id = 0; - var subscription = source.subscribe( - function (x) { - hasvalue = true; - value = x; - id++; - var currentId = id, - d = new SingleAssignmentDisposable(); - cancelable.setDisposable(d); - d.setDisposable(scheduler.scheduleWithRelative(dueTime, function () { - hasvalue && id === currentId && observer.onNext(value); - hasvalue = false; - })); - }, - function (e) { - cancelable.dispose(); - observer.onError(e); - hasvalue = false; - id++; - }, - function () { - cancelable.dispose(); - hasvalue && observer.onNext(value); - observer.onCompleted(); - hasvalue = false; - id++; - }); - return new CompositeDisposable(subscription, cancelable); - }, this); - } + var DebounceObservable = (function (__super__) { + inherits(DebounceObservable, __super__); + function DebounceObservable(source, dt, s) { + isScheduler(s) || (s = defaultScheduler); + this.source = source; + this._dt = dt; + this._s = s; + __super__.call(this); + } + + DebounceObservable.prototype.subscribeCore = function (o) { + var cancelable = new SerialDisposable(); + return new BinaryDisposable( + this.source.subscribe(new DebounceObserver(o, this._dt, this._s, cancelable)), + cancelable); + }; + + return DebounceObservable; + }(ObservableBase)); + + var DebounceObserver = (function (__super__) { + inherits(DebounceObserver, __super__); + function DebounceObserver(observer, dueTime, scheduler, cancelable) { + this._o = observer; + this._d = dueTime; + this._scheduler = scheduler; + this._c = cancelable; + this._v = null; + this._hv = false; + this._id = 0; + __super__.call(this); + } + + function scheduleFuture(s, state) { + state.self._hv && state.self._id === state.currentId && state.self._o.onNext(state.x); + state.self._hv = false; + } + + DebounceObserver.prototype.next = function (x) { + this._hv = true; + this._v = x; + var currentId = ++this._id, d = new SingleAssignmentDisposable(); + this._c.setDisposable(d); + d.setDisposable(this._scheduler.scheduleFuture(this, this._d, function (_, self) { + self._hv && self._id === currentId && self._o.onNext(x); + self._hv = false; + })); + }; + + DebounceObserver.prototype.error = function (e) { + this._c.dispose(); + this._o.onError(e); + this._hv = false; + this._id++; + }; + + DebounceObserver.prototype.completed = function () { + this._c.dispose(); + this._hv && this._o.onNext(this._v); + this._o.onCompleted(); + this._hv = false; + this._id++; + }; + + return DebounceObserver; + }(AbstractObserver)); function debounceWithSelector(source, durationSelector) { return new AnonymousObservable(function (o) { @@ -5098,7 +5732,7 @@ Observable.fromNodeCallback = function (fn, ctx, selector) { id++; } ); - return new CompositeDisposable(subscription, cancelable); + return new BinaryDisposable(subscription, cancelable); }, source); } @@ -5106,12 +5740,50 @@ Observable.fromNodeCallback = function (fn, ctx, selector) { if (isFunction (arguments[0])) { return debounceWithSelector(this, arguments[0]); } else if (typeof arguments[0] === 'number') { - return debounce(this, arguments[0], arguments[1]); + return new DebounceObservable(this, arguments[0], arguments[1]); } else { throw new Error('Invalid arguments'); } }; + var TimestampObservable = (function (__super__) { + inherits(TimestampObservable, __super__); + function TimestampObservable(source, s) { + this.source = source; + this._s = s; + __super__.call(this); + } + + TimestampObservable.prototype.subscribeCore = function (o) { + return this.source.subscribe(new TimestampObserver(o, this._s)); + }; + + return TimestampObservable; + }(ObservableBase)); + + var TimestampObserver = (function (__super__) { + inherits(TimestampObserver, __super__); + function TimestampObserver(o, s) { + this._o = o; + this._s = s; + __super__.call(this); + } + + TimestampObserver.prototype.next = function (x) { + this._o.onNext({ value: x, timestamp: this._s.now() }); + }; + + TimestampObserver.prototype.error = function (e) { + this._o.onError(e); + }; + + TimestampObserver.prototype.completed = function () { + this._o.onCompleted(); + }; + + return TimestampObserver; + }(AbstractObserver)); + /** * Records the timestamp for each value in an observable sequence. * @@ -5123,43 +5795,78 @@ Observable.fromNodeCallback = function (fn, ctx, selector) { * @returns {Observable} An observable sequence with timestamp information on values. */ observableProto.timestamp = function (scheduler) { - isScheduler(scheduler) || (scheduler = timeoutScheduler); - return this.map(function (x) { - return { value: x, timestamp: scheduler.now() }; - }); + isScheduler(scheduler) || (scheduler = defaultScheduler); + return new TimestampObservable(this, scheduler); }; - function sampleObservable(source, sampler) { - return new AnonymousObservable(function (o) { - var atEnd = false, value, hasValue = false; + var SampleObservable = (function(__super__) { + inherits(SampleObservable, __super__); + function SampleObservable(source, sampler) { + this.source = source; + this._sampler = sampler; + __super__.call(this); + } - function sampleSubscribe() { - if (hasValue) { - hasValue = false; - o.onNext(value); - } - atEnd && o.onCompleted(); + SampleObservable.prototype.subscribeCore = function (o) { + var state = { + o: o, + atEnd: false, + value: null, + hasValue: false, + sourceSubscription: new SingleAssignmentDisposable() + }; + + state.sourceSubscription.setDisposable(this.source.subscribe(new SampleSourceObserver(state))); + return new BinaryDisposable( + state.sourceSubscription, + this._sampler.subscribe(new SamplerObserver(state)) + ); + }; + + return SampleObservable; + }(ObservableBase)); + + var SamplerObserver = (function(__super__) { + inherits(SamplerObserver, __super__); + function SamplerObserver(s) { + this._s = s; + __super__.call(this); + } + + SamplerObserver.prototype._handleMessage = function () { + if (this._s.hasValue) { + this._s.hasValue = false; + this._s.o.onNext(this._s.value); } + this._s.atEnd && this._s.o.onCompleted(); + }; - var sourceSubscription = new SingleAssignmentDisposable(); - sourceSubscription.setDisposable(source.subscribe( - function (newValue) { - hasValue = true; - value = newValue; - }, - function (e) { o.onError(e); }, - function () { - atEnd = true; - sourceSubscription.dispose(); - } - )); + SamplerObserver.prototype.next = function () { this._handleMessage(); }; + SamplerObserver.prototype.error = function (e) { this._s.onError(e); }; + SamplerObserver.prototype.completed = function () { this._handleMessage(); }; - return new CompositeDisposable( - sourceSubscription, - sampler.subscribe(sampleSubscribe, function (e) { o.onError(e); }, sampleSubscribe) - ); - }, source); - } + return SamplerObserver; + }(AbstractObserver)); + + var SampleSourceObserver = (function(__super__) { + inherits(SampleSourceObserver, __super__); + function SampleSourceObserver(s) { + this._s = s; + __super__.call(this); + } + + SampleSourceObserver.prototype.next = function (x) { + this._s.hasValue = true; + this._s.value = x; + }; + SampleSourceObserver.prototype.error = function (e) { this._s.o.onError(e); }; + SampleSourceObserver.prototype.completed = function () { + this._s.atEnd = true; + this._s.sourceSubscription.dispose(); + }; + + return SampleSourceObserver; + }(AbstractObserver)); /** * Samples the observable sequence at each interval. @@ -5173,11 +5880,11 @@ Observable.fromNodeCallback = function (fn, ctx, selector) { * @param {Scheduler} [scheduler] Scheduler to run the sampling timer on. If not specified, the timeout scheduler is used. * @returns {Observable} Sampled observable sequence. */ - observableProto.sample = observableProto.throttleLatest = function (intervalOrSampler, scheduler) { - isScheduler(scheduler) || (scheduler = timeoutScheduler); + observableProto.sample = function (intervalOrSampler, scheduler) { + isScheduler(scheduler) || (scheduler = defaultScheduler); return typeof intervalOrSampler === 'number' ? - sampleObservable(this, observableinterval(intervalOrSampler, scheduler)) : - sampleObservable(this, intervalOrSampler); + new SampleObservable(this, observableinterval(intervalOrSampler, scheduler)) : + new SampleObservable(this, intervalOrSampler); }; var TimeoutError = Rx.TimeoutError = function(message) { @@ -5193,9 +5900,11 @@ Observable.fromNodeCallback = function (fn, ctx, selector) { timeoutDurationSelector = firstTimeout; firstTimeout = observableNever(); } - other || (other = observableThrow(new TimeoutError())); + Observable.isObservable(other) || (other = observableThrow(new TimeoutError())); return new AnonymousObservable(function (o) { - var subscription = new SerialDisposable(), timer = new SerialDisposable(), original = new SingleAssignmentDisposable(); + var subscription = new SerialDisposable(), + timer = new SerialDisposable(), + original = new SingleAssignmentDisposable(); subscription.setDisposable(original); @@ -5203,14 +5912,20 @@ Observable.fromNodeCallback = function (fn, ctx, selector) { function setTimer(timeout) { var myId = id, d = new SingleAssignmentDisposable(); + + function timerWins() { + switched = (myId === id); + return switched; + } + timer.setDisposable(d); d.setDisposable(timeout.subscribe(function () { - id === myId && subscription.setDisposable(other.subscribe(o)); + timerWins() && subscription.setDisposable(other.subscribe(o)); d.dispose(); }, function (e) { - id === myId && o.onError(e); + timerWins() && o.onError(e); }, function () { - id === myId && subscription.setDisposable(other.subscribe(o)); + timerWins() && subscription.setDisposable(other.subscribe(o)); })); }; @@ -5234,23 +5949,18 @@ Observable.fromNodeCallback = function (fn, ctx, selector) { }, function () { oWins() && o.onCompleted(); })); - return new CompositeDisposable(subscription, timer); + return new BinaryDisposable(subscription, timer); }, source); } function timeout(source, dueTime, other, scheduler) { - if (other == null) { throw new Error('other or scheduler must be specified'); } if (isScheduler(other)) { scheduler = other; other = observableThrow(new TimeoutError()); } if (other instanceof Error) { other = observableThrow(other); } - isScheduler(scheduler) || (scheduler = timeoutScheduler); - - var schedulerMethod = dueTime instanceof Date ? - 'scheduleWithAbsolute' : - 'scheduleWithRelative'; - + isScheduler(scheduler) || (scheduler = defaultScheduler); + Observable.isObservable(other) || (other = observableThrow(new TimeoutError())); return new AnonymousObservable(function (o) { var id = 0, original = new SingleAssignmentDisposable(), @@ -5262,8 +5972,9 @@ Observable.fromNodeCallback = function (fn, ctx, selector) { function createTimer() { var myId = id; - timer.setDisposable(scheduler[schedulerMethod](dueTime, function () { - if (id === myId) { + timer.setDisposable(scheduler.scheduleFuture(null, dueTime, function () { + switched = id === myId; + if (switched) { isPromise(other) && (other = observableFromPromise(other)); subscription.setDisposable(other.subscribe(o)); } @@ -5289,7 +6000,7 @@ Observable.fromNodeCallback = function (fn, ctx, selector) { o.onCompleted(); } })); - return new CompositeDisposable(subscription, timer); + return new BinaryDisposable(subscription, timer); }, source); } @@ -5311,7 +6022,7 @@ Observable.fromNodeCallback = function (fn, ctx, selector) { * @returns {Observable} An Observable that performs the throttle operation. */ observableProto.throttle = function (windowDuration, scheduler) { - isScheduler(scheduler) || (scheduler = timeoutScheduler); + isScheduler(scheduler) || (scheduler = defaultScheduler); var duration = +windowDuration || 0; if (duration <= 0) { throw new RangeError('windowDuration cannot be less or equal zero.'); } var source = this; @@ -5330,12 +6041,23 @@ Observable.fromNodeCallback = function (fn, ctx, selector) { }; var PausableObservable = (function (__super__) { - inherits(PausableObservable, __super__); + function PausableObservable(source, pauser) { + this.source = source; + this.controller = new Subject(); + + if (pauser && pauser.subscribe) { + this.pauser = this.controller.merge(pauser); + } else { + this.pauser = this.controller; + } + + __super__.call(this); + } - function subscribe(observer) { + PausableObservable.prototype._subscribe = function (o) { var conn = this.source.publish(), - subscription = conn.subscribe(observer), + subscription = conn.subscribe(o), connection = disposableEmpty; var pausable = this.pauser.distinctUntilChanged().subscribe(function (b) { @@ -5347,21 +6069,8 @@ Observable.fromNodeCallback = function (fn, ctx, selector) { } }); - return new CompositeDisposable(subscription, connection, pausable); - } - - function PausableObservable(source, pauser) { - this.source = source; - this.controller = new Subject(); - - if (pauser && pauser.subscribe) { - this.pauser = this.controller.merge(pauser); - } else { - this.pauser = this.controller; - } - - __super__.call(this, subscribe, source); - } + return new NAryDisposable([subscription, connection, pausable]); + }; PausableObservable.prototype.pause = function () { this.controller.onNext(false); @@ -5407,7 +6116,7 @@ Observable.fromNodeCallback = function (fn, ctx, selector) { isDone && values[1] && o.onCompleted(); } - return new CompositeDisposable( + return new BinaryDisposable( source.subscribe( function (x) { next(x, 0); @@ -5437,10 +6146,21 @@ Observable.fromNodeCallback = function (fn, ctx, selector) { } var PausableBufferedObservable = (function (__super__) { - inherits(PausableBufferedObservable, __super__); + function PausableBufferedObservable(source, pauser) { + this.source = source; + this.controller = new Subject(); + + if (pauser && pauser.subscribe) { + this.pauser = this.controller.merge(pauser); + } else { + this.pauser = this.controller; + } - function subscribe(o) { + __super__.call(this); + } + + PausableBufferedObservable.prototype._subscribe = function (o) { var q = [], previousShouldFire; function drainQueue() { while (q.length > 0) { o.onNext(q.shift()); } } @@ -5454,7 +6174,7 @@ Observable.fromNodeCallback = function (fn, ctx, selector) { }) .subscribe( function (results) { - if (previousShouldFire !== undefined && results.shouldFire != previousShouldFire) { + if (previousShouldFire !== undefined && results.shouldFire !== previousShouldFire) { previousShouldFire = results.shouldFire; // change in shouldFire if (results.shouldFire) { drainQueue(); } @@ -5477,21 +6197,8 @@ Observable.fromNodeCallback = function (fn, ctx, selector) { o.onCompleted(); } ); - return subscription; - } - - function PausableBufferedObservable(source, pauser) { - this.source = source; - this.controller = new Subject(); - - if (pauser && pauser.subscribe) { - this.pauser = this.controller.merge(pauser); - } else { - this.pauser = this.controller; - } - - __super__.call(this, subscribe, source); - } + return subscription; + }; PausableBufferedObservable.prototype.pause = function () { this.controller.onNext(false); @@ -5514,151 +6221,146 @@ Observable.fromNodeCallback = function (fn, ctx, selector) { * @param {Observable} pauser The observable sequence used to pause the underlying sequence. * @returns {Observable} The observable sequence which is paused based upon the pauser. */ - observableProto.pausableBuffered = function (subject) { - return new PausableBufferedObservable(this, subject); + observableProto.pausableBuffered = function (pauser) { + return new PausableBufferedObservable(this, pauser); }; -var ControlledObservable = (function (__super__) { - - inherits(ControlledObservable, __super__); - - function subscribe (observer) { - return this.source.subscribe(observer); - } - - function ControlledObservable (source, enableQueue, scheduler) { - __super__.call(this, subscribe, source); - this.subject = new ControlledSubject(enableQueue, scheduler); - this.source = source.multicast(this.subject).refCount(); - } + var ControlledObservable = (function (__super__) { + inherits(ControlledObservable, __super__); + function ControlledObservable (source, enableQueue, scheduler) { + __super__.call(this); + this.subject = new ControlledSubject(enableQueue, scheduler); + this.source = source.multicast(this.subject).refCount(); + } - ControlledObservable.prototype.request = function (numberOfItems) { - return this.subject.request(numberOfItems == null ? -1 : numberOfItems); - }; + ControlledObservable.prototype._subscribe = function (o) { + return this.source.subscribe(o); + }; - return ControlledObservable; + ControlledObservable.prototype.request = function (numberOfItems) { + return this.subject.request(numberOfItems == null ? -1 : numberOfItems); + }; -}(Observable)); + return ControlledObservable; -var ControlledSubject = (function (__super__) { + }(Observable)); - function subscribe (observer) { - return this.subject.subscribe(observer); - } + var ControlledSubject = (function (__super__) { + inherits(ControlledSubject, __super__); + function ControlledSubject(enableQueue, scheduler) { + enableQueue == null && (enableQueue = true); - inherits(ControlledSubject, __super__); - - function ControlledSubject(enableQueue, scheduler) { - enableQueue == null && (enableQueue = true); - - __super__.call(this, subscribe); - this.subject = new Subject(); - this.enableQueue = enableQueue; - this.queue = enableQueue ? [] : null; - this.requestedCount = 0; - this.requestedDisposable = null; - this.error = null; - this.hasFailed = false; - this.hasCompleted = false; - this.scheduler = scheduler || currentThreadScheduler; - } + __super__.call(this); + this.subject = new Subject(); + this.enableQueue = enableQueue; + this.queue = enableQueue ? [] : null; + this.requestedCount = 0; + this.requestedDisposable = null; + this.error = null; + this.hasFailed = false; + this.hasCompleted = false; + this.scheduler = scheduler || currentThreadScheduler; + } - addProperties(ControlledSubject.prototype, Observer, { - onCompleted: function () { - this.hasCompleted = true; - if (!this.enableQueue || this.queue.length === 0) { - this.subject.onCompleted(); - this.disposeCurrentRequest() - } else { - this.queue.push(Notification.createOnCompleted()); - } - }, - onError: function (error) { - this.hasFailed = true; - this.error = error; - if (!this.enableQueue || this.queue.length === 0) { - this.subject.onError(error); - this.disposeCurrentRequest() - } else { - this.queue.push(Notification.createOnError(error)); - } - }, - onNext: function (value) { - if (this.requestedCount <= 0) { - this.enableQueue && this.queue.push(Notification.createOnNext(value)); - } else { - (this.requestedCount-- === 0) && this.disposeCurrentRequest(); - this.subject.onNext(value); - } - }, - _processRequest: function (numberOfItems) { - if (this.enableQueue) { - while (this.queue.length > 0 && (numberOfItems > 0 || this.queue[0].kind !== 'N')) { - var first = this.queue.shift(); - first.accept(this.subject); - if (first.kind === 'N') { - numberOfItems--; - } else { - this.disposeCurrentRequest(); - this.queue = []; + addProperties(ControlledSubject.prototype, Observer, { + _subscribe: function (o) { + return this.subject.subscribe(o); + }, + onCompleted: function () { + this.hasCompleted = true; + if (!this.enableQueue || this.queue.length === 0) { + this.subject.onCompleted(); + this.disposeCurrentRequest(); + } else { + this.queue.push(Notification.createOnCompleted()); + } + }, + onError: function (error) { + this.hasFailed = true; + this.error = error; + if (!this.enableQueue || this.queue.length === 0) { + this.subject.onError(error); + this.disposeCurrentRequest(); + } else { + this.queue.push(Notification.createOnError(error)); + } + }, + onNext: function (value) { + if (this.requestedCount <= 0) { + this.enableQueue && this.queue.push(Notification.createOnNext(value)); + } else { + (this.requestedCount-- === 0) && this.disposeCurrentRequest(); + this.subject.onNext(value); + } + }, + _processRequest: function (numberOfItems) { + if (this.enableQueue) { + while (this.queue.length > 0 && (numberOfItems > 0 || this.queue[0].kind !== 'N')) { + var first = this.queue.shift(); + first.accept(this.subject); + if (first.kind === 'N') { + numberOfItems--; + } else { + this.disposeCurrentRequest(); + this.queue = []; + } } } - } - return numberOfItems; - }, - request: function (number) { - this.disposeCurrentRequest(); - var self = this; + return numberOfItems; + }, + request: function (number) { + this.disposeCurrentRequest(); + var self = this; + + this.requestedDisposable = this.scheduler.schedule(number, + function(s, i) { + var remaining = self._processRequest(i); + var stopped = self.hasCompleted || self.hasFailed; + if (!stopped && remaining > 0) { + self.requestedCount = remaining; + + return disposableCreate(function () { + self.requestedCount = 0; + }); + // Scheduled item is still in progress. Return a new + // disposable to allow the request to be interrupted + // via dispose. + } + }); - this.requestedDisposable = this.scheduler.scheduleWithState(number, - function(s, i) { - var remaining = self._processRequest(i); - var stopped = self.hasCompleted || self.hasFailed - if (!stopped && remaining > 0) { - self.requestedCount = remaining; - - return disposableCreate(function () { - self.requestedCount = 0; - }); - // Scheduled item is still in progress. Return a new - // disposable to allow the request to be interrupted - // via dispose. + return this.requestedDisposable; + }, + disposeCurrentRequest: function () { + if (this.requestedDisposable) { + this.requestedDisposable.dispose(); + this.requestedDisposable = null; } - }); - - return this.requestedDisposable; - }, - disposeCurrentRequest: function () { - if (this.requestedDisposable) { - this.requestedDisposable.dispose(); - this.requestedDisposable = null; } - } - }); + }); - return ControlledSubject; -}(Observable)); + return ControlledSubject; + }(Observable)); -/** - * Attaches a controller to the observable sequence with the ability to queue. - * @example - * var source = Rx.Observable.interval(100).controlled(); - * source.request(3); // Reads 3 values - * @param {bool} enableQueue truthy value to determine if values should be queued pending the next request - * @param {Scheduler} scheduler determines how the requests will be scheduled - * @returns {Observable} The observable sequence which only propagates values on request. - */ -observableProto.controlled = function (enableQueue, scheduler) { + /** + * Attaches a controller to the observable sequence with the ability to queue. + * @example + * var source = Rx.Observable.interval(100).controlled(); + * source.request(3); // Reads 3 values + * @param {bool} enableQueue truthy value to determine if values should be queued pending the next request + * @param {Scheduler} scheduler determines how the requests will be scheduled + * @returns {Observable} The observable sequence which only propagates values on request. + */ + observableProto.controlled = function (enableQueue, scheduler) { - if (enableQueue && isScheduler(enableQueue)) { + if (enableQueue && isScheduler(enableQueue)) { scheduler = enableQueue; enableQueue = true; - } + } - if (enableQueue == null) { enableQueue = true; } - return new ControlledObservable(this, enableQueue, scheduler); -}; + if (enableQueue == null) { enableQueue = true; } + return new ControlledObservable(this, enableQueue, scheduler); + }; /** * Pipes the existing Observable sequence into a Node.js Stream. @@ -5692,6 +6394,42 @@ observableProto.controlled = function (enableQueue, scheduler) { return dest; }; + var TransduceObserver = (function (__super__) { + inherits(TransduceObserver, __super__); + function TransduceObserver(o, xform) { + this._o = o; + this._xform = xform; + __super__.call(this); + } + + TransduceObserver.prototype.next = function (x) { + var res = tryCatch(this._xform['@@transducer/step']).call(this._xform, this._o, x); + if (res === errorObj) { this._o.onError(res.e); } + }; + + TransduceObserver.prototype.error = function (e) { this._o.onError(e); }; + + TransduceObserver.prototype.completed = function () { + this._xform['@@transducer/result'](this._o); + }; + + return TransduceObserver; + }(AbstractObserver)); + + function transformForObserver(o) { + return { + '@@transducer/init': function() { + return o; + }, + '@@transducer/step': function(obs, input) { + return obs.onNext(input); + }, + '@@transducer/result': function(obs) { + return obs.onCompleted(); + } + }; + } + /** * Executes a transducer to transform the observable sequence * @param {Transducer} transducer A transducer to execute @@ -5699,31 +6437,9 @@ observableProto.controlled = function (enableQueue, scheduler) { */ observableProto.transduce = function(transducer) { var source = this; - - function transformForObserver(o) { - return { - '@@transducer/init': function() { - return o; - }, - '@@transducer/step': function(obs, input) { - return obs.onNext(input); - }, - '@@transducer/result': function(obs) { - return obs.onCompleted(); - } - }; - } - return new AnonymousObservable(function(o) { var xform = transducer(transformForObserver(o)); - return source.subscribe( - function(v) { - var res = tryCatch(xform['@@transducer/step']).call(xform, o, v); - if (res === errorObj) { o.onError(res.e); } - }, - function (e) { o.onError(e); }, - function() { xform['@@transducer/result'](o); } - ); + return source.subscribe(new TransduceObserver(o, xform)); }, source); }; @@ -5739,29 +6455,26 @@ observableProto.controlled = function (enableQueue, scheduler) { function setDisposable(s, state) { var ado = state[0], self = state[1]; var sub = tryCatch(self.__subscribe).call(self, ado); - - if (sub === errorObj) { - if(!ado.fail(errorObj.e)) { return thrower(errorObj.e); } - } + if (sub === errorObj && !ado.fail(errorObj.e)) { thrower(errorObj.e); } ado.setDisposable(fixSubscriber(sub)); } - function innerSubscribe(observer) { - var ado = new AutoDetachObserver(observer), state = [ado, this]; + function AnonymousObservable(subscribe, parent) { + this.source = parent; + this.__subscribe = subscribe; + __super__.call(this); + } + + AnonymousObservable.prototype._subscribe = function (o) { + var ado = new AutoDetachObserver(o), state = [ado, this]; if (currentThreadScheduler.scheduleRequired()) { - currentThreadScheduler.scheduleWithState(state, setDisposable); + currentThreadScheduler.schedule(state, setDisposable); } else { setDisposable(null, state); } return ado; - } - - function AnonymousObservable(subscribe, parent) { - this.source = parent; - this.__subscribe = subscribe; - __super__.call(this, innerSubscribe); - } + }; return AnonymousObservable; @@ -5809,16 +6522,16 @@ observableProto.controlled = function (enableQueue, scheduler) { return AutoDetachObserver; }(AbstractObserver)); - var InnerSubscription = function (subject, observer) { - this.subject = subject; - this.observer = observer; + var InnerSubscription = function (s, o) { + this._s = s; + this._o = o; }; InnerSubscription.prototype.dispose = function () { - if (!this.subject.isDisposed && this.observer !== null) { - var idx = this.subject.observers.indexOf(this.observer); - this.subject.observers.splice(idx, 1); - this.observer = null; + if (!this._s.isDisposed && this._o !== null) { + var idx = this._s.observers.indexOf(this._o); + this._s.observers.splice(idx, 1); + this._o = null; } }; @@ -5827,39 +6540,34 @@ observableProto.controlled = function (enableQueue, scheduler) { * Each notification is broadcasted to all subscribed observers. */ var Subject = Rx.Subject = (function (__super__) { - function subscribe(observer) { - checkDisposed(this); - if (!this.isStopped) { - this.observers.push(observer); - return new InnerSubscription(this, observer); - } - if (this.hasError) { - observer.onError(this.error); - return disposableEmpty; - } - observer.onCompleted(); - return disposableEmpty; - } - inherits(Subject, __super__); - - /** - * Creates a subject. - */ function Subject() { - __super__.call(this, subscribe); - this.isDisposed = false, - this.isStopped = false, + __super__.call(this); + this.isDisposed = false; + this.isStopped = false; this.observers = []; this.hasError = false; } addProperties(Subject.prototype, Observer.prototype, { + _subscribe: function (o) { + checkDisposed(this); + if (!this.isStopped) { + this.observers.push(o); + return new InnerSubscription(this, o); + } + if (this.hasError) { + o.onError(this.error); + return disposableEmpty; + } + o.onCompleted(); + return disposableEmpty; + }, /** * Indicates whether the subject has observers subscribed to it. * @returns {Boolean} Indicates whether the subject has observers subscribed to it. */ - hasObservers: function () { return this.observers.length > 0; }, + hasObservers: function () { checkDisposed(this); return this.observers.length > 0; }, /** * Notifies all subscribed observers about the end of the sequence. */ @@ -5930,27 +6638,6 @@ observableProto.controlled = function (enableQueue, scheduler) { * The last value before the OnCompleted notification, or the error received through OnError, is sent to all subscribed observers. */ var AsyncSubject = Rx.AsyncSubject = (function (__super__) { - - function subscribe(observer) { - checkDisposed(this); - - if (!this.isStopped) { - this.observers.push(observer); - return new InnerSubscription(this, observer); - } - - if (this.hasError) { - observer.onError(this.error); - } else if (this.hasValue) { - observer.onNext(this.value); - observer.onCompleted(); - } else { - observer.onCompleted(); - } - - return disposableEmpty; - } - inherits(AsyncSubject, __super__); /** @@ -5958,8 +6645,7 @@ observableProto.controlled = function (enableQueue, scheduler) { * @constructor */ function AsyncSubject() { - __super__.call(this, subscribe); - + __super__.call(this); this.isDisposed = false; this.isStopped = false; this.hasValue = false; @@ -5967,15 +6653,31 @@ observableProto.controlled = function (enableQueue, scheduler) { this.hasError = false; } - addProperties(AsyncSubject.prototype, Observer, { + addProperties(AsyncSubject.prototype, Observer.prototype, { + _subscribe: function (o) { + checkDisposed(this); + + if (!this.isStopped) { + this.observers.push(o); + return new InnerSubscription(this, o); + } + + if (this.hasError) { + o.onError(this.error); + } else if (this.hasValue) { + o.onNext(this.value); + o.onCompleted(); + } else { + o.onCompleted(); + } + + return disposableEmpty; + }, /** * Indicates whether the subject has observers subscribed to it. * @returns {Boolean} Indicates whether the subject has observers subscribed to it. */ - hasObservers: function () { - checkDisposed(this); - return this.observers.length > 0; - }, + hasObservers: function () { checkDisposed(this); return this.observers.length > 0; }, /** * Notifies all subscribed observers about the end of the sequence, also causing the last received value to be sent out (if any). */ @@ -6035,7 +6737,7 @@ observableProto.controlled = function (enableQueue, scheduler) { dispose: function () { this.isDisposed = true; this.observers = null; - this.exception = null; + this.error = null; this.value = null; } }); @@ -6045,18 +6747,16 @@ observableProto.controlled = function (enableQueue, scheduler) { var AnonymousSubject = Rx.AnonymousSubject = (function (__super__) { inherits(AnonymousSubject, __super__); - - function subscribe(observer) { - return this.observable.subscribe(observer); - } - function AnonymousSubject(observer, observable) { this.observer = observer; this.observable = observable; - __super__.call(this, subscribe); + __super__.call(this); } addProperties(AnonymousSubject.prototype, Observer.prototype, { + _subscribe: function (o) { + return this.observable.subscribe(o); + }, onCompleted: function () { this.observer.onCompleted(); }, @@ -6076,37 +6776,31 @@ observableProto.controlled = function (enableQueue, scheduler) { * Observers can subscribe to the subject to receive the last (or initial) value and all subsequent notifications. */ var BehaviorSubject = Rx.BehaviorSubject = (function (__super__) { - function subscribe(observer) { - checkDisposed(this); - if (!this.isStopped) { - this.observers.push(observer); - observer.onNext(this.value); - return new InnerSubscription(this, observer); - } - if (this.hasError) { - observer.onError(this.error); - } else { - observer.onCompleted(); - } - return disposableEmpty; - } - inherits(BehaviorSubject, __super__); - - /** - * Initializes a new instance of the BehaviorSubject class which creates a subject that caches its last value and starts with the specified value. - * @param {Mixed} value Initial value sent to observers when no other value has been received by the subject yet. - */ function BehaviorSubject(value) { - __super__.call(this, subscribe); - this.value = value, - this.observers = [], - this.isDisposed = false, - this.isStopped = false, + __super__.call(this); + this.value = value; + this.observers = []; + this.isDisposed = false; + this.isStopped = false; this.hasError = false; } - addProperties(BehaviorSubject.prototype, Observer, { + addProperties(BehaviorSubject.prototype, Observer.prototype, { + _subscribe: function (o) { + checkDisposed(this); + if (!this.isStopped) { + this.observers.push(o); + o.onNext(this.value); + return new InnerSubscription(this, o); + } + if (this.hasError) { + o.onError(this.error); + } else { + o.onCompleted(); + } + return disposableEmpty; + }, /** * Gets the current value or throws an exception. * Value is frozen after onCompleted is called. @@ -6115,17 +6809,15 @@ observableProto.controlled = function (enableQueue, scheduler) { * @returns {Mixed} The initial value passed to the constructor until onNext is called; after which, the last value passed to onNext. */ getValue: function () { - checkDisposed(this); - if (this.hasError) { - throw this.error; - } - return this.value; + checkDisposed(this); + if (this.hasError) { thrower(this.error); } + return this.value; }, /** * Indicates whether the subject has observers subscribed to it. * @returns {Boolean} Indicates whether the subject has observers subscribed to it. */ - hasObservers: function () { return this.observers.length > 0; }, + hasObservers: function () { checkDisposed(this); return this.observers.length > 0; }, /** * Notifies all subscribed observers about the end of the sequence. */ @@ -6175,7 +6867,7 @@ observableProto.controlled = function (enableQueue, scheduler) { this.isDisposed = true; this.observers = null; this.value = null; - this.exception = null; + this.error = null; } }); @@ -6197,27 +6889,6 @@ observableProto.controlled = function (enableQueue, scheduler) { }); } - function subscribe(observer) { - var so = new ScheduledObserver(this.scheduler, observer), - subscription = createRemovableDisposable(this, so); - checkDisposed(this); - this._trim(this.scheduler.now()); - this.observers.push(so); - - for (var i = 0, len = this.q.length; i < len; i++) { - so.onNext(this.q[i].value); - } - - if (this.hasError) { - so.onError(this.error); - } else if (this.isStopped) { - so.onCompleted(); - } - - so.ensureActive(); - return subscription; - } - inherits(ReplaySubject, __super__); /** @@ -6236,17 +6907,35 @@ observableProto.controlled = function (enableQueue, scheduler) { this.isDisposed = false; this.hasError = false; this.error = null; - __super__.call(this, subscribe); + __super__.call(this); } addProperties(ReplaySubject.prototype, Observer.prototype, { + _subscribe: function (o) { + checkDisposed(this); + var so = new ScheduledObserver(this.scheduler, o), subscription = createRemovableDisposable(this, so); + + this._trim(this.scheduler.now()); + this.observers.push(so); + + for (var i = 0, len = this.q.length; i < len; i++) { + so.onNext(this.q[i].value); + } + + if (this.hasError) { + so.onError(this.error); + } else if (this.isStopped) { + so.onCompleted(); + } + + so.ensureActive(); + return subscription; + }, /** * Indicates whether the subject has observers subscribed to it. * @returns {Boolean} Indicates whether the subject has observers subscribed to it. */ - hasObservers: function () { - return this.observers.length > 0; - }, + hasObservers: function () { checkDisposed(this); return this.observers.length > 0; }, _trim: function (now) { while (this.q.length > this.bufferSize) { this.q.shift(); @@ -6324,7 +7013,6 @@ observableProto.controlled = function (enableQueue, scheduler) { */ Rx.Pauser = (function (__super__) { inherits(Pauser, __super__); - function Pauser() { __super__.call(this); } |