summaryrefslogtreecommitdiff
path: root/tools/eslint/node_modules/rx-lite/rx.lite.js
diff options
context:
space:
mode:
Diffstat (limited to 'tools/eslint/node_modules/rx-lite/rx.lite.js')
-rw-r--r--tools/eslint/node_modules/rx-lite/rx.lite.js5000
1 files changed, 2844 insertions, 2156 deletions
diff --git a/tools/eslint/node_modules/rx-lite/rx.lite.js b/tools/eslint/node_modules/rx-lite/rx.lite.js
index 21e9aa5b97..0603c30e2f 100644
--- a/tools/eslint/node_modules/rx-lite/rx.lite.js
+++ b/tools/eslint/node_modules/rx-lite/rx.lite.js
@@ -1,4 +1,4 @@
-// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+// Copyright (c) Microsoft, All rights reserved. See License.txt in the project root for license information.
;(function (undefined) {
@@ -7,15 +7,18 @@
'object': true
};
- var
- freeExports = objectTypes[typeof exports] && exports && !exports.nodeType && exports,
- freeSelf = objectTypes[typeof self] && self.Object && self,
- freeWindow = objectTypes[typeof window] && window && window.Object && window,
- freeModule = objectTypes[typeof module] && module && !module.nodeType && module,
- moduleExports = freeModule && freeModule.exports === freeExports && freeExports,
- freeGlobal = freeExports && freeModule && typeof global == 'object' && global && global.Object && global;
+ function checkGlobal(value) {
+ return (value && value.Object === Object) ? value : null;
+ }
- var root = root = freeGlobal || ((freeWindow !== (this && this.window)) && freeWindow) || freeSelf || this;
+ var freeExports = (objectTypes[typeof exports] && exports && !exports.nodeType) ? exports : null;
+ var freeModule = (objectTypes[typeof module] && module && !module.nodeType) ? module : null;
+ var freeGlobal = checkGlobal(freeExports && freeModule && typeof global === 'object' && global);
+ var freeSelf = checkGlobal(objectTypes[typeof self] && self);
+ var freeWindow = checkGlobal(objectTypes[typeof window] && window);
+ var moduleExports = (freeModule && freeModule.exports === freeExports) ? freeExports : null;
+ var thisGlobal = checkGlobal(objectTypes[typeof this] && this);
+ var root = freeGlobal || ((freeWindow !== (thisGlobal && thisGlobal.window)) && freeWindow) || freeSelf || thisGlobal || Function('return this')();
var Rx = {
internals: {},
@@ -38,7 +41,7 @@
var isFn = function (value) {
return typeof value == 'function' || false;
- }
+ };
// fallback for older versions of Chrome and Safari
if (isFn(/x/)) {
@@ -57,6 +60,7 @@
}
var errorObj = {e: {}};
+
function tryCatcherGen(tryCatchTarget) {
return function tryCatcher() {
try {
@@ -65,12 +69,14 @@
errorObj.e = e;
return errorObj;
}
- }
+ };
}
+
var tryCatch = Rx.internals.tryCatch = function tryCatch(fn) {
if (!isFunction(fn)) { throw new TypeError('fn must be a function'); }
return tryCatcherGen(fn);
- }
+ };
+
function thrower(e) {
throw e;
}
@@ -168,38 +174,38 @@
var EmptyError = Rx.EmptyError = function() {
this.message = 'Sequence contains no elements.';
- this.name = 'EmptyError';
Error.call(this);
};
EmptyError.prototype = Object.create(Error.prototype);
+ EmptyError.prototype.name = 'EmptyError';
var ObjectDisposedError = Rx.ObjectDisposedError = function() {
this.message = 'Object has been disposed';
- this.name = 'ObjectDisposedError';
Error.call(this);
};
ObjectDisposedError.prototype = Object.create(Error.prototype);
+ ObjectDisposedError.prototype.name = 'ObjectDisposedError';
var ArgumentOutOfRangeError = Rx.ArgumentOutOfRangeError = function () {
this.message = 'Argument out of range';
- this.name = 'ArgumentOutOfRangeError';
Error.call(this);
};
ArgumentOutOfRangeError.prototype = Object.create(Error.prototype);
+ ArgumentOutOfRangeError.prototype.name = 'ArgumentOutOfRangeError';
var NotSupportedError = Rx.NotSupportedError = function (message) {
this.message = message || 'This operation is not supported';
- this.name = 'NotSupportedError';
Error.call(this);
};
NotSupportedError.prototype = Object.create(Error.prototype);
+ NotSupportedError.prototype.name = 'NotSupportedError';
var NotImplementedError = Rx.NotImplementedError = function (message) {
this.message = message || 'This operation is not implemented';
- this.name = 'NotImplementedError';
Error.call(this);
};
NotImplementedError.prototype = Object.create(Error.prototype);
+ NotImplementedError.prototype.name = 'NotImplementedError';
var notImplemented = Rx.helpers.notImplemented = function () {
throw new NotImplementedError();
@@ -220,12 +226,12 @@
var doneEnumerator = Rx.doneEnumerator = { done: true, value: undefined };
var isIterable = Rx.helpers.isIterable = function (o) {
- return o[$iterator$] !== undefined;
- }
+ return o && o[$iterator$] !== undefined;
+ };
var isArrayLike = Rx.helpers.isArrayLike = function (o) {
return o && o.length !== undefined;
- }
+ };
Rx.helpers.iterator = $iterator$;
@@ -239,7 +245,7 @@
case 1:
return function(arg) {
return func.call(thisArg, arg);
- }
+ };
case 2:
return function(value, index) {
return func.call(thisArg, value, index);
@@ -265,280 +271,303 @@
'constructor'],
dontEnumsLength = dontEnums.length;
- /** `Object#toString` result shortcuts */
- var argsClass = '[object Arguments]',
- arrayClass = '[object Array]',
- boolClass = '[object Boolean]',
- dateClass = '[object Date]',
- errorClass = '[object Error]',
- funcClass = '[object Function]',
- numberClass = '[object Number]',
- objectClass = '[object Object]',
- regexpClass = '[object RegExp]',
- stringClass = '[object String]';
-
- var toString = Object.prototype.toString,
- hasOwnProperty = Object.prototype.hasOwnProperty,
- supportsArgsClass = toString.call(arguments) == argsClass, // For less <IE9 && FF<4
- supportNodeClass,
- errorProto = Error.prototype,
- objectProto = Object.prototype,
- stringProto = String.prototype,
- propertyIsEnumerable = objectProto.propertyIsEnumerable;
-
- try {
- supportNodeClass = !(toString.call(document) == objectClass && !({ 'toString': 0 } + ''));
- } catch (e) {
- supportNodeClass = true;
- }
-
- var nonEnumProps = {};
- nonEnumProps[arrayClass] = nonEnumProps[dateClass] = nonEnumProps[numberClass] = { 'constructor': true, 'toLocaleString': true, 'toString': true, 'valueOf': true };
- nonEnumProps[boolClass] = nonEnumProps[stringClass] = { 'constructor': true, 'toString': true, 'valueOf': true };
- nonEnumProps[errorClass] = nonEnumProps[funcClass] = nonEnumProps[regexpClass] = { 'constructor': true, 'toString': true };
- nonEnumProps[objectClass] = { 'constructor': true };
-
- var support = {};
- (function () {
- var ctor = function() { this.x = 1; },
- props = [];
-
- ctor.prototype = { 'valueOf': 1, 'y': 1 };
- for (var key in new ctor) { props.push(key); }
- for (key in arguments) { }
-
- // Detect if `name` or `message` properties of `Error.prototype` are enumerable by default.
- support.enumErrorProps = propertyIsEnumerable.call(errorProto, 'message') || propertyIsEnumerable.call(errorProto, 'name');
+var argsTag = '[object Arguments]',
+ arrayTag = '[object Array]',
+ boolTag = '[object Boolean]',
+ dateTag = '[object Date]',
+ errorTag = '[object Error]',
+ funcTag = '[object Function]',
+ mapTag = '[object Map]',
+ numberTag = '[object Number]',
+ objectTag = '[object Object]',
+ regexpTag = '[object RegExp]',
+ setTag = '[object Set]',
+ stringTag = '[object String]',
+ weakMapTag = '[object WeakMap]';
+
+var arrayBufferTag = '[object ArrayBuffer]',
+ float32Tag = '[object Float32Array]',
+ float64Tag = '[object Float64Array]',
+ int8Tag = '[object Int8Array]',
+ int16Tag = '[object Int16Array]',
+ int32Tag = '[object Int32Array]',
+ uint8Tag = '[object Uint8Array]',
+ uint8ClampedTag = '[object Uint8ClampedArray]',
+ uint16Tag = '[object Uint16Array]',
+ uint32Tag = '[object Uint32Array]';
+
+var typedArrayTags = {};
+typedArrayTags[float32Tag] = typedArrayTags[float64Tag] =
+typedArrayTags[int8Tag] = typedArrayTags[int16Tag] =
+typedArrayTags[int32Tag] = typedArrayTags[uint8Tag] =
+typedArrayTags[uint8ClampedTag] = typedArrayTags[uint16Tag] =
+typedArrayTags[uint32Tag] = true;
+typedArrayTags[argsTag] = typedArrayTags[arrayTag] =
+typedArrayTags[arrayBufferTag] = typedArrayTags[boolTag] =
+typedArrayTags[dateTag] = typedArrayTags[errorTag] =
+typedArrayTags[funcTag] = typedArrayTags[mapTag] =
+typedArrayTags[numberTag] = typedArrayTags[objectTag] =
+typedArrayTags[regexpTag] = typedArrayTags[setTag] =
+typedArrayTags[stringTag] = typedArrayTags[weakMapTag] = false;
+
+var objectProto = Object.prototype,
+ hasOwnProperty = objectProto.hasOwnProperty,
+ objToString = objectProto.toString,
+ MAX_SAFE_INTEGER = Math.pow(2, 53) - 1;
+
+var keys = Object.keys || (function() {
+ var hasOwnProperty = Object.prototype.hasOwnProperty,
+ hasDontEnumBug = !({ toString: null }).propertyIsEnumerable('toString'),
+ dontEnums = [
+ 'toString',
+ 'toLocaleString',
+ 'valueOf',
+ 'hasOwnProperty',
+ 'isPrototypeOf',
+ 'propertyIsEnumerable',
+ 'constructor'
+ ],
+ dontEnumsLength = dontEnums.length;
+
+ return function(obj) {
+ if (typeof obj !== 'object' && (typeof obj !== 'function' || obj === null)) {
+ throw new TypeError('Object.keys called on non-object');
+ }
- // Detect if `prototype` properties are enumerable by default.
- support.enumPrototypes = propertyIsEnumerable.call(ctor, 'prototype');
+ var result = [], prop, i;
- // Detect if `arguments` object indexes are non-enumerable
- support.nonEnumArgs = key != 0;
+ for (prop in obj) {
+ if (hasOwnProperty.call(obj, prop)) {
+ result.push(prop);
+ }
+ }
- // Detect if properties shadowing those on `Object.prototype` are non-enumerable.
- support.nonEnumShadows = !/valueOf/.test(props);
- }(1));
+ if (hasDontEnumBug) {
+ for (i = 0; i < dontEnumsLength; i++) {
+ if (hasOwnProperty.call(obj, dontEnums[i])) {
+ result.push(dontEnums[i]);
+ }
+ }
+ }
+ return result;
+ };
+ }());
- var isObject = Rx.internals.isObject = function(value) {
- var type = typeof value;
- return value && (type == 'function' || type == 'object') || false;
- };
+function equalObjects(object, other, equalFunc, isLoose, stackA, stackB) {
+ var objProps = keys(object),
+ objLength = objProps.length,
+ othProps = keys(other),
+ othLength = othProps.length;
- function keysIn(object) {
- var result = [];
- if (!isObject(object)) {
- return result;
+ if (objLength !== othLength && !isLoose) {
+ return false;
+ }
+ var index = objLength, key;
+ while (index--) {
+ key = objProps[index];
+ if (!(isLoose ? key in other : hasOwnProperty.call(other, key))) {
+ return false;
}
- if (support.nonEnumArgs && object.length && isArguments(object)) {
- object = slice.call(object);
+ }
+ var skipCtor = isLoose;
+ while (++index < objLength) {
+ key = objProps[index];
+ var objValue = object[key],
+ othValue = other[key],
+ result;
+
+ if (!(result === undefined ? equalFunc(objValue, othValue, isLoose, stackA, stackB) : result)) {
+ return false;
}
- var skipProto = support.enumPrototypes && typeof object == 'function',
- skipErrorProps = support.enumErrorProps && (object === errorProto || object instanceof Error);
-
- for (var key in object) {
- if (!(skipProto && key == 'prototype') &&
- !(skipErrorProps && (key == 'message' || key == 'name'))) {
- result.push(key);
- }
+ skipCtor || (skipCtor = key === 'constructor');
+ }
+ if (!skipCtor) {
+ var objCtor = object.constructor,
+ othCtor = other.constructor;
+
+ if (objCtor !== othCtor &&
+ ('constructor' in object && 'constructor' in other) &&
+ !(typeof objCtor === 'function' && objCtor instanceof objCtor &&
+ typeof othCtor === 'function' && othCtor instanceof othCtor)) {
+ return false;
}
+ }
+ return true;
+}
- if (support.nonEnumShadows && object !== objectProto) {
- var ctor = object.constructor,
- index = -1,
- length = dontEnumsLength;
+function equalByTag(object, other, tag) {
+ switch (tag) {
+ case boolTag:
+ case dateTag:
+ return +object === +other;
- if (object === (ctor && ctor.prototype)) {
- var className = object === stringProto ? stringClass : object === errorProto ? errorClass : toString.call(object),
- nonEnum = nonEnumProps[className];
- }
- while (++index < length) {
- key = dontEnums[index];
- if (!(nonEnum && nonEnum[key]) && hasOwnProperty.call(object, key)) {
- result.push(key);
- }
- }
- }
- return result;
- }
+ case errorTag:
+ return object.name === other.name && object.message === other.message;
- function internalFor(object, callback, keysFunc) {
- var index = -1,
- props = keysFunc(object),
- length = props.length;
+ case numberTag:
+ return (object !== +object) ?
+ other !== +other :
+ object === +other;
- while (++index < length) {
- var key = props[index];
- if (callback(object[key], key, object) === false) {
- break;
- }
- }
- return object;
+ case regexpTag:
+ case stringTag:
+ return object === (other + '');
}
+ return false;
+}
- function internalForIn(object, callback) {
- return internalFor(object, callback, keysIn);
- }
+var isObject = Rx.internals.isObject = function(value) {
+ var type = typeof value;
+ return !!value && (type === 'object' || type === 'function');
+};
- function isNode(value) {
- // IE < 9 presents DOM nodes as `Object` objects except they have `toString`
- // methods that are `typeof` "string" and still can coerce nodes to strings
- return typeof value.toString != 'function' && typeof (value + '') == 'string';
- }
+function isObjectLike(value) {
+ return !!value && typeof value === 'object';
+}
- var isArguments = function(value) {
- return (value && typeof value == 'object') ? toString.call(value) == argsClass : false;
- }
+function isLength(value) {
+ return typeof value === 'number' && value > -1 && value % 1 === 0 && value <= MAX_SAFE_INTEGER;
+}
- // fallback for browsers that can't detect `arguments` objects by [[Class]]
- if (!supportsArgsClass) {
- isArguments = function(value) {
- return (value && typeof value == 'object') ? hasOwnProperty.call(value, 'callee') : false;
- };
+var isHostObject = (function() {
+ try {
+ Object({ 'toString': 0 } + '');
+ } catch(e) {
+ return function() { return false; };
}
-
- var isEqual = Rx.internals.isEqual = function (x, y) {
- return deepEquals(x, y, [], []);
+ return function(value) {
+ return typeof value.toString !== 'function' && typeof (value + '') === 'string';
};
+}());
- /** @private
- * Used for deep comparison
- **/
- function deepEquals(a, b, stackA, stackB) {
- // exit early for identical values
- if (a === b) {
- // treat `+0` vs. `-0` as not equal
- return a !== 0 || (1 / a == 1 / b);
- }
+function isTypedArray(value) {
+ return isObjectLike(value) && isLength(value.length) && !!typedArrayTags[objToString.call(value)];
+}
+
+var isArray = Array.isArray || function(value) {
+ return isObjectLike(value) && isLength(value.length) && objToString.call(value) === arrayTag;
+};
- var type = typeof a,
- otherType = typeof b;
+function arraySome (array, predicate) {
+ var index = -1,
+ length = array.length;
- // exit early for unlike primitive values
- if (a === a && (a == null || b == null ||
- (type != 'function' && type != 'object' && otherType != 'function' && otherType != 'object'))) {
- return false;
+ while (++index < length) {
+ if (predicate(array[index], index, array)) {
+ return true;
}
+ }
+ return false;
+}
- // compare [[Class]] names
- var className = toString.call(a),
- otherClass = toString.call(b);
+function equalArrays(array, other, equalFunc, isLoose, stackA, stackB) {
+ var index = -1,
+ arrLength = array.length,
+ othLength = other.length;
- if (className == argsClass) {
- className = objectClass;
- }
- if (otherClass == argsClass) {
- otherClass = objectClass;
- }
- if (className != otherClass) {
+ if (arrLength !== othLength && !(isLoose && othLength > arrLength)) {
+ return false;
+ }
+ // Ignore non-index properties.
+ while (++index < arrLength) {
+ var arrValue = array[index],
+ othValue = other[index],
+ result;
+
+ if (result !== undefined) {
+ if (result) {
+ continue;
+ }
return false;
}
- switch (className) {
- case boolClass:
- case dateClass:
- // coerce dates and booleans to numbers, dates to milliseconds and booleans
- // to `1` or `0` treating invalid dates coerced to `NaN` as not equal
- return +a == +b;
-
- case numberClass:
- // treat `NaN` vs. `NaN` as equal
- return (a != +a) ?
- b != +b :
- // but treat `-0` vs. `+0` as not equal
- (a == 0 ? (1 / a == 1 / b) : a == +b);
-
- case regexpClass:
- case stringClass:
- // coerce regexes to strings (http://es5.github.io/#x15.10.6.4)
- // treat string primitives and their corresponding object instances as equal
- return a == String(b);
- }
- var isArr = className == arrayClass;
- if (!isArr) {
-
- // exit for functions and DOM nodes
- if (className != objectClass || (!support.nodeClass && (isNode(a) || isNode(b)))) {
+ // Recursively compare arrays (susceptible to call stack limits).
+ if (isLoose) {
+ if (!arraySome(other, function(othValue) {
+ return arrValue === othValue || equalFunc(arrValue, othValue, isLoose, stackA, stackB);
+ })) {
return false;
}
- // in older versions of Opera, `arguments` objects have `Array` constructors
- var ctorA = !support.argsObject && isArguments(a) ? Object : a.constructor,
- ctorB = !support.argsObject && isArguments(b) ? Object : b.constructor;
-
- // non `Object` object instances with different constructors are not equal
- if (ctorA != ctorB &&
- !(hasOwnProperty.call(a, 'constructor') && hasOwnProperty.call(b, 'constructor')) &&
- !(isFunction(ctorA) && ctorA instanceof ctorA && isFunction(ctorB) && ctorB instanceof ctorB) &&
- ('constructor' in a && 'constructor' in b)
- ) {
- return false;
- }
- }
- // assume cyclic structures are equal
- // the algorithm for detecting cyclic structures is adapted from ES 5.1
- // section 15.12.3, abstract operation `JO` (http://es5.github.io/#x15.12.3)
- var initedStack = !stackA;
- stackA || (stackA = []);
- stackB || (stackB = []);
-
- var length = stackA.length;
- while (length--) {
- if (stackA[length] == a) {
- return stackB[length] == b;
- }
+ } else if (!(arrValue === othValue || equalFunc(arrValue, othValue, isLoose, stackA, stackB))) {
+ return false;
}
- var size = 0;
- var result = true;
+ }
+ return true;
+}
- // add `a` and `b` to the stack of traversed objects
- stackA.push(a);
- stackB.push(b);
+function baseIsEqualDeep(object, other, equalFunc, isLoose, stackA, stackB) {
+ var objIsArr = isArray(object),
+ othIsArr = isArray(other),
+ objTag = arrayTag,
+ othTag = arrayTag;
- // recursively compare objects and arrays (susceptible to call stack limits)
- if (isArr) {
- // compare lengths to determine if a deep comparison is necessary
- length = a.length;
- size = b.length;
- result = size == length;
+ if (!objIsArr) {
+ objTag = objToString.call(object);
+ if (objTag === argsTag) {
+ objTag = objectTag;
+ } else if (objTag !== objectTag) {
+ objIsArr = isTypedArray(object);
+ }
+ }
+ if (!othIsArr) {
+ othTag = objToString.call(other);
+ if (othTag === argsTag) {
+ othTag = objectTag;
+ }
+ }
+ var objIsObj = objTag === objectTag && !isHostObject(object),
+ othIsObj = othTag === objectTag && !isHostObject(other),
+ isSameTag = objTag === othTag;
- if (result) {
- // deep compare the contents, ignoring non-numeric properties
- while (size--) {
- var index = length,
- value = b[size];
+ if (isSameTag && !(objIsArr || objIsObj)) {
+ return equalByTag(object, other, objTag);
+ }
+ if (!isLoose) {
+ var objIsWrapped = objIsObj && hasOwnProperty.call(object, '__wrapped__'),
+ othIsWrapped = othIsObj && hasOwnProperty.call(other, '__wrapped__');
- if (!(result = deepEquals(a[size], value, stackA, stackB))) {
- break;
- }
- }
- }
+ if (objIsWrapped || othIsWrapped) {
+ return equalFunc(objIsWrapped ? object.value() : object, othIsWrapped ? other.value() : other, isLoose, stackA, stackB);
}
- else {
- // deep compare objects using `forIn`, instead of `forOwn`, to avoid `Object.keys`
- // which, in this case, is more costly
- internalForIn(b, function(value, key, b) {
- if (hasOwnProperty.call(b, key)) {
- // count the number of properties.
- size++;
- // deep compare each property value.
- return (result = hasOwnProperty.call(a, key) && deepEquals(a[key], value, stackA, stackB));
- }
- });
+ }
+ if (!isSameTag) {
+ return false;
+ }
+ // Assume cyclic values are equal.
+ // For more information on detecting circular references see https://es5.github.io/#JO.
+ stackA || (stackA = []);
+ stackB || (stackB = []);
- if (result) {
- // ensure both objects have the same number of properties
- internalForIn(a, function(value, key, a) {
- if (hasOwnProperty.call(a, key)) {
- // `size` will be `-1` if `a` has more properties than `b`
- return (result = --size > -1);
- }
- });
- }
+ var length = stackA.length;
+ while (length--) {
+ if (stackA[length] === object) {
+ return stackB[length] === other;
}
- stackA.pop();
- stackB.pop();
+ }
+ // Add `object` and `other` to the stack of traversed objects.
+ stackA.push(object);
+ stackB.push(other);
- return result;
+ var result = (objIsArr ? equalArrays : equalObjects)(object, other, equalFunc, isLoose, stackA, stackB);
+
+ stackA.pop();
+ stackB.pop();
+
+ return result;
+}
+
+function baseIsEqual(value, other, isLoose, stackA, stackB) {
+ if (value === other) {
+ return true;
}
+ if (value == null || other == null || (!isObject(value) && !isObjectLike(other))) {
+ return value !== value && other !== other;
+ }
+ return baseIsEqualDeep(value, other, baseIsEqual, isLoose, stackA, stackB);
+}
+
+var isEqual = Rx.internals.isEqual = function (value, other) {
+ return baseIsEqual(value, other);
+};
var hasProp = {}.hasOwnProperty,
slice = Array.prototype.slice;
@@ -562,7 +591,7 @@
// Rx Utils
var addRef = Rx.internals.addRef = function (xs, r) {
return new AnonymousObservable(function (observer) {
- return new CompositeDisposable(r.getDisposable(), xs.subscribe(observer));
+ return new BinaryDisposable(r.getDisposable(), xs.subscribe(observer));
});
};
@@ -582,15 +611,11 @@
var args = [], i, len;
if (Array.isArray(arguments[0])) {
args = arguments[0];
- len = args.length;
} else {
len = arguments.length;
args = new Array(len);
for(i = 0; i < len; i++) { args[i] = arguments[i]; }
}
- for(i = 0; i < len; i++) {
- if (!isDisposable(args[i])) { throw new TypeError('Not a disposable'); }
- }
this.disposables = args;
this.isDisposed = false;
this.length = args.length;
@@ -689,6 +714,10 @@
if (disposable.isDisposed) { throw new ObjectDisposedError(); }
};
+ var disposableFixup = Disposable._fixup = function (result) {
+ return isDisposable(result) ? result : disposableEmpty;
+ };
+
// Single assignment
var SingleAssignmentDisposable = Rx.SingleAssignmentDisposable = function () {
this.isDisposed = false;
@@ -708,8 +737,8 @@
this.isDisposed = true;
var old = this.current;
this.current = null;
+ old && old.dispose();
}
- old && old.dispose();
};
// Multiple assignment disposable
@@ -738,6 +767,39 @@
old && old.dispose();
};
+ var BinaryDisposable = Rx.BinaryDisposable = function (first, second) {
+ this._first = first;
+ this._second = second;
+ this.isDisposed = false;
+ };
+
+ BinaryDisposable.prototype.dispose = function () {
+ if (!this.isDisposed) {
+ this.isDisposed = true;
+ var old1 = this._first;
+ this._first = null;
+ old1 && old1.dispose();
+ var old2 = this._second;
+ this._second = null;
+ old2 && old2.dispose();
+ }
+ };
+
+ var NAryDisposable = Rx.NAryDisposable = function (disposables) {
+ this._disposables = disposables;
+ this.isDisposed = false;
+ };
+
+ NAryDisposable.prototype.dispose = function () {
+ if (!this.isDisposed) {
+ this.isDisposed = true;
+ for (var i = 0, len = this._disposables.length; i < len; i++) {
+ this._disposables[i].dispose();
+ }
+ this._disposables.length = 0;
+ }
+ };
+
/**
* Represents a disposable resource that only disposes its underlying disposable resource when all dependent disposable objects have been disposed.
*/
@@ -803,7 +865,7 @@
this.dueTime = dueTime;
this.comparer = comparer || defaultSubComparer;
this.disposable = new SingleAssignmentDisposable();
- }
+ };
ScheduledItem.prototype.invoke = function () {
this.disposable.setDisposable(this.invokeCore());
@@ -818,95 +880,58 @@
};
ScheduledItem.prototype.invokeCore = function () {
- return this.action(this.scheduler, this.state);
+ return disposableFixup(this.action(this.scheduler, this.state));
};
/** Provides a set of static properties to access commonly used schedulers. */
var Scheduler = Rx.Scheduler = (function () {
- function Scheduler(now, schedule, scheduleRelative, scheduleAbsolute) {
- this.now = now;
- this._schedule = schedule;
- this._scheduleRelative = scheduleRelative;
- this._scheduleAbsolute = scheduleAbsolute;
- }
+ function Scheduler() { }
/** Determines whether the given object is a scheduler */
Scheduler.isScheduler = function (s) {
return s instanceof Scheduler;
- }
-
- function invokeAction(scheduler, action) {
- action();
- return disposableEmpty;
- }
+ };
var schedulerProto = Scheduler.prototype;
/**
- * Schedules an action to be executed.
- * @param {Function} action Action to execute.
- * @returns {Disposable} The disposable object used to cancel the scheduled action (best effort).
- */
- schedulerProto.schedule = function (action) {
- return this._schedule(action, invokeAction);
- };
-
- /**
- * Schedules an action to be executed.
- * @param state State passed to the action to be executed.
- * @param {Function} action Action to be executed.
- * @returns {Disposable} The disposable object used to cancel the scheduled action (best effort).
- */
- schedulerProto.scheduleWithState = function (state, action) {
- return this._schedule(state, action);
+ * Schedules an action to be executed.
+ * @param state State passed to the action to be executed.
+ * @param {Function} action Action to be executed.
+ * @returns {Disposable} The disposable object used to cancel the scheduled action (best effort).
+ */
+ schedulerProto.schedule = function (state, action) {
+ throw new NotImplementedError();
};
- /**
- * Schedules an action to be executed after the specified relative due time.
- * @param {Function} action Action to execute.
- * @param {Number} dueTime Relative time after which to execute the action.
- * @returns {Disposable} The disposable object used to cancel the scheduled action (best effort).
- */
- schedulerProto.scheduleWithRelative = function (dueTime, action) {
- return this._scheduleRelative(action, dueTime, invokeAction);
- };
+ /**
+ * Schedules an action to be executed after dueTime.
+ * @param state State passed to the action to be executed.
+ * @param {Function} action Action to be executed.
+ * @param {Number} dueTime Relative time after which to execute the action.
+ * @returns {Disposable} The disposable object used to cancel the scheduled action (best effort).
+ */
+ schedulerProto.scheduleFuture = function (state, dueTime, action) {
+ var dt = dueTime;
+ dt instanceof Date && (dt = dt - this.now());
+ dt = Scheduler.normalize(dt);
- /**
- * Schedules an action to be executed after dueTime.
- * @param state State passed to the action to be executed.
- * @param {Function} action Action to be executed.
- * @param {Number} dueTime Relative time after which to execute the action.
- * @returns {Disposable} The disposable object used to cancel the scheduled action (best effort).
- */
- schedulerProto.scheduleWithRelativeAndState = function (state, dueTime, action) {
- return this._scheduleRelative(state, dueTime, action);
- };
+ if (dt === 0) { return this.schedule(state, action); }
- /**
- * Schedules an action to be executed at the specified absolute due time.
- * @param {Function} action Action to execute.
- * @param {Number} dueTime Absolute time at which to execute the action.
- * @returns {Disposable} The disposable object used to cancel the scheduled action (best effort).
- */
- schedulerProto.scheduleWithAbsolute = function (dueTime, action) {
- return this._scheduleAbsolute(action, dueTime, invokeAction);
+ return this._scheduleFuture(state, dt, action);
};
- /**
- * Schedules an action to be executed at dueTime.
- * @param {Mixed} state State passed to the action to be executed.
- * @param {Function} action Action to be executed.
- * @param {Number}dueTime Absolute time at which to execute the action.
- * @returns {Disposable} The disposable object used to cancel the scheduled action (best effort).
- */
- schedulerProto.scheduleWithAbsoluteAndState = function (state, dueTime, action) {
- return this._scheduleAbsolute(state, dueTime, action);
+ schedulerProto._scheduleFuture = function (state, dueTime, action) {
+ throw new NotImplementedError();
};
/** Gets the current time according to the local machine's system clock. */
Scheduler.now = defaultNow;
+ /** Gets the current time according to the local machine's system clock. */
+ Scheduler.prototype.now = defaultNow;
+
/**
* Normalizes the specified TimeSpan value to a positive value.
* @param {Number} timeSpan The time span value to normalize.
@@ -932,7 +957,7 @@
function innerAction(state2) {
var isAdded = false, isDone = false;
- var d = scheduler.scheduleWithState(state2, scheduleWork);
+ var d = scheduler.schedule(state2, scheduleWork);
if (!isDone) {
group.add(d);
isAdded = true;
@@ -950,7 +975,7 @@
}
}
- function invokeRecDate(scheduler, pair, method) {
+ function invokeRecDate(scheduler, pair) {
var state = pair[0], action = pair[1], group = new CompositeDisposable();
action(state, innerAction);
return group;
@@ -958,7 +983,7 @@
function innerAction(state2, dueTime1) {
var isAdded = false, isDone = false;
- var d = scheduler[method](state2, dueTime1, scheduleWork);
+ var d = scheduler.scheduleFuture(state2, dueTime1, scheduleWork);
if (!isDone) {
group.add(d);
isAdded = true;
@@ -976,100 +1001,39 @@
}
}
- function invokeRecDateRelative(s, p) {
- return invokeRecDate(s, p, 'scheduleWithRelativeAndState');
- }
-
- function invokeRecDateAbsolute(s, p) {
- return invokeRecDate(s, p, 'scheduleWithAbsoluteAndState');
- }
-
- function scheduleInnerRecursive(action, self) {
- action(function(dt) { self(action, dt); });
- }
-
- /**
- * Schedules an action to be executed recursively.
- * @param {Function} action Action to execute recursively. The parameter passed to the action is used to trigger recursive scheduling of the action.
- * @returns {Disposable} The disposable object used to cancel the scheduled action (best effort).
- */
- schedulerProto.scheduleRecursive = function (action) {
- return this.scheduleRecursiveWithState(action, scheduleInnerRecursive);
- };
-
/**
* Schedules an action to be executed recursively.
* @param {Mixed} state State passed to the action to be executed.
* @param {Function} action Action to execute recursively. The last parameter passed to the action is used to trigger recursive scheduling of the action, passing in recursive invocation state.
* @returns {Disposable} The disposable object used to cancel the scheduled action (best effort).
*/
- schedulerProto.scheduleRecursiveWithState = function (state, action) {
- return this.scheduleWithState([state, action], invokeRecImmediate);
- };
-
- /**
- * Schedules an action to be executed recursively after a specified relative due time.
- * @param {Function} action Action to execute recursively. The parameter passed to the action is used to trigger recursive scheduling of the action at the specified relative time.
- * @param {Number}dueTime Relative time after which to execute the action for the first time.
- * @returns {Disposable} The disposable object used to cancel the scheduled action (best effort).
- */
- schedulerProto.scheduleRecursiveWithRelative = function (dueTime, action) {
- return this.scheduleRecursiveWithRelativeAndState(action, dueTime, scheduleInnerRecursive);
+ schedulerProto.scheduleRecursive = function (state, action) {
+ return this.schedule([state, action], invokeRecImmediate);
};
/**
- * Schedules an action to be executed recursively after a specified relative due time.
+ * Schedules an action to be executed recursively after a specified relative or absolute due time.
* @param {Mixed} state State passed to the action to be executed.
* @param {Function} action Action to execute recursively. The last parameter passed to the action is used to trigger recursive scheduling of the action, passing in the recursive due time and invocation state.
- * @param {Number}dueTime Relative time after which to execute the action for the first time.
+ * @param {Number | Date} dueTime Relative or absolute time after which to execute the action for the first time.
* @returns {Disposable} The disposable object used to cancel the scheduled action (best effort).
*/
- schedulerProto.scheduleRecursiveWithRelativeAndState = function (state, dueTime, action) {
- return this._scheduleRelative([state, action], dueTime, invokeRecDateRelative);
+ schedulerProto.scheduleRecursiveFuture = function (state, dueTime, action) {
+ return this.scheduleFuture([state, action], dueTime, invokeRecDate);
};
- /**
- * Schedules an action to be executed recursively at a specified absolute due time.
- * @param {Function} action Action to execute recursively. The parameter passed to the action is used to trigger recursive scheduling of the action at the specified absolute time.
- * @param {Number}dueTime Absolute time at which to execute the action for the first time.
- * @returns {Disposable} The disposable object used to cancel the scheduled action (best effort).
- */
- schedulerProto.scheduleRecursiveWithAbsolute = function (dueTime, action) {
- return this.scheduleRecursiveWithAbsoluteAndState(action, dueTime, scheduleInnerRecursive);
- };
-
- /**
- * Schedules an action to be executed recursively at a specified absolute due time.
- * @param {Mixed} state State passed to the action to be executed.
- * @param {Function} action Action to execute recursively. The last parameter passed to the action is used to trigger recursive scheduling of the action, passing in the recursive due time and invocation state.
- * @param {Number}dueTime Absolute time at which to execute the action for the first time.
- * @returns {Disposable} The disposable object used to cancel the scheduled action (best effort).
- */
- schedulerProto.scheduleRecursiveWithAbsoluteAndState = function (state, dueTime, action) {
- return this._scheduleAbsolute([state, action], dueTime, invokeRecDateAbsolute);
- };
}(Scheduler.prototype));
(function (schedulerProto) {
/**
* Schedules a periodic piece of work by dynamically discovering the scheduler's capabilities. The periodic task will be scheduled using window.setInterval for the base implementation.
- * @param {Number} period Period for running the work periodically.
- * @param {Function} action Action to be executed.
- * @returns {Disposable} The disposable object used to cancel the scheduled recurring action (best effort).
- */
- Scheduler.prototype.schedulePeriodic = function (period, action) {
- return this.schedulePeriodicWithState(null, period, action);
- };
-
- /**
- * Schedules a periodic piece of work by dynamically discovering the scheduler's capabilities. The periodic task will be scheduled using window.setInterval for the base implementation.
* @param {Mixed} state Initial state passed to the action upon the first iteration.
* @param {Number} period Period for running the work periodically.
* @param {Function} action Action to be executed, potentially updating the state.
* @returns {Disposable} The disposable object used to cancel the scheduled recurring action (best effort).
*/
- Scheduler.prototype.schedulePeriodicWithState = function(state, period, action) {
+ schedulerProto.schedulePeriodic = function(state, period, action) {
if (typeof root.setInterval === 'undefined') { throw new NotSupportedError(); }
period = normalizeTime(period);
var s = state, id = root.setInterval(function () { s = action(s); }, period);
@@ -1079,54 +1043,73 @@
}(Scheduler.prototype));
/** Gets a scheduler that schedules work immediately on the current thread. */
- var immediateScheduler = Scheduler.immediate = (function () {
- function scheduleNow(state, action) { return action(this, state); }
- return new Scheduler(defaultNow, scheduleNow, notSupported, notSupported);
- }());
+ var ImmediateScheduler = (function (__super__) {
+ inherits(ImmediateScheduler, __super__);
+ function ImmediateScheduler() {
+ __super__.call(this);
+ }
+
+ ImmediateScheduler.prototype.schedule = function (state, action) {
+ return disposableFixup(action(this, state));
+ };
+
+ return ImmediateScheduler;
+ }(Scheduler));
+
+ var immediateScheduler = Scheduler.immediate = new ImmediateScheduler();
/**
* Gets a scheduler that schedules work as soon as possible on the current thread.
*/
- var currentThreadScheduler = Scheduler.currentThread = (function () {
+ var CurrentThreadScheduler = (function (__super__) {
var queue;
function runTrampoline () {
while (queue.length > 0) {
- var item = queue.shift();
+ var item = queue.dequeue();
!item.isCancelled() && item.invoke();
}
}
- function scheduleNow(state, action) {
+ inherits(CurrentThreadScheduler, __super__);
+ function CurrentThreadScheduler() {
+ __super__.call(this);
+ }
+
+ CurrentThreadScheduler.prototype.schedule = function (state, action) {
var si = new ScheduledItem(this, state, action, this.now());
if (!queue) {
- queue = [si];
+ queue = new PriorityQueue(4);
+ queue.enqueue(si);
var result = tryCatch(runTrampoline)();
queue = null;
- if (result === errorObj) { return thrower(result.e); }
+ if (result === errorObj) { thrower(result.e); }
} else {
- queue.push(si);
+ queue.enqueue(si);
}
return si.disposable;
- }
+ };
- var currentScheduler = new Scheduler(defaultNow, scheduleNow, notSupported, notSupported);
- currentScheduler.scheduleRequired = function () { return !queue; };
+ CurrentThreadScheduler.prototype.scheduleRequired = function () { return !queue; };
- return currentScheduler;
- }());
+ return CurrentThreadScheduler;
+ }(Scheduler));
+
+ var currentThreadScheduler = Scheduler.currentThread = new CurrentThreadScheduler();
var SchedulePeriodicRecursive = Rx.internals.SchedulePeriodicRecursive = (function () {
- function tick(command, recurse) {
- recurse(0, this._period);
- try {
- this._state = this._action(this._state);
- } catch (e) {
- this._cancel.dispose();
- throw e;
- }
+ function createTick(self) {
+ return function tick(command, recurse) {
+ recurse(0, self._period);
+ var state = tryCatch(self._action)(self._state);
+ if (state === errorObj) {
+ self._cancel.dispose();
+ thrower(state.e);
+ }
+ self._state = state;
+ };
}
function SchedulePeriodicRecursive(scheduler, state, period, action) {
@@ -1139,7 +1122,7 @@
SchedulePeriodicRecursive.prototype.start = function () {
var d = new SingleAssignmentDisposable();
this._cancel = d;
- d.setDisposable(this._scheduler.scheduleRecursiveWithRelativeAndState(0, this._period, tick.bind(this)));
+ d.setDisposable(this._scheduler.scheduleRecursiveFuture(0, this._period, createTick(this)));
return d;
};
@@ -1181,7 +1164,7 @@
function runTask(handle) {
if (currentlyRunning) {
- localSetTimeout(function () { runTask(handle) }, 0);
+ localSetTimeout(function () { runTask(handle); }, 0);
} else {
var task = tasksByHandle[handle];
if (task) {
@@ -1189,12 +1172,12 @@
var result = tryCatch(task)();
clearMethod(handle);
currentlyRunning = false;
- if (result === errorObj) { return thrower(result.e); }
+ if (result === errorObj) { thrower(result.e); }
}
}
}
- var reNative = RegExp('^' +
+ var reNative = new RegExp('^' +
String(toString)
.replace(/[.*+?^${}()|[\]\\]/g, '\\$&')
.replace(/toString| for [^\]]+/g, '.*?') + '$'
@@ -1235,25 +1218,19 @@
} else if (postMessageSupported()) {
var MSG_PREFIX = 'ms.rx.schedule' + Math.random();
- function onGlobalPostMessage(event) {
+ var onGlobalPostMessage = function (event) {
// Only if we're a match to avoid any other global events
if (typeof event.data === 'string' && event.data.substring(0, MSG_PREFIX.length) === MSG_PREFIX) {
runTask(event.data.substring(MSG_PREFIX.length));
}
- }
+ };
- if (root.addEventListener) {
- root.addEventListener('message', onGlobalPostMessage, false);
- } else if (root.attachEvent) {
- root.attachEvent('onmessage', onGlobalPostMessage);
- } else {
- root.onmessage = onGlobalPostMessage;
- }
+ root.addEventListener('message', onGlobalPostMessage, false);
scheduleMethod = function (action) {
var id = nextHandle++;
tasksByHandle[id] = action;
- root.postMessage(MSG_PREFIX + currentId, '*');
+ root.postMessage(MSG_PREFIX + id, '*');
return id;
};
} else if (!!root.MessageChannel) {
@@ -1300,61 +1277,170 @@
/**
* Gets a scheduler that schedules work via a timed callback based upon platform.
*/
- var timeoutScheduler = Scheduler.timeout = Scheduler['default'] = (function () {
+ var DefaultScheduler = (function (__super__) {
+ inherits(DefaultScheduler, __super__);
+ function DefaultScheduler() {
+ __super__.call(this);
+ }
+
+ function scheduleAction(disposable, action, scheduler, state) {
+ return function schedule() {
+ disposable.setDisposable(Disposable._fixup(action(scheduler, state)));
+ };
+ }
+
+ function ClearDisposable(id) {
+ this._id = id;
+ this.isDisposed = false;
+ }
+
+ ClearDisposable.prototype.dispose = function () {
+ if (!this.isDisposed) {
+ this.isDisposed = true;
+ clearMethod(this._id);
+ }
+ };
+
+ function LocalClearDisposable(id) {
+ this._id = id;
+ this.isDisposed = false;
+ }
+
+ LocalClearDisposable.prototype.dispose = function () {
+ if (!this.isDisposed) {
+ this.isDisposed = true;
+ localClearTimeout(this._id);
+ }
+ };
+
+ DefaultScheduler.prototype.schedule = function (state, action) {
+ var disposable = new SingleAssignmentDisposable(),
+ id = scheduleMethod(scheduleAction(disposable, action, this, state));
+ return new BinaryDisposable(disposable, new ClearDisposable(id));
+ };
+
+ DefaultScheduler.prototype._scheduleFuture = function (state, dueTime, action) {
+ if (dueTime === 0) { return this.schedule(state, action); }
+ var disposable = new SingleAssignmentDisposable(),
+ id = localSetTimeout(scheduleAction(disposable, action, this, state), dueTime);
+ return new BinaryDisposable(disposable, new LocalClearDisposable(id));
+ };
+
+ return DefaultScheduler;
+ }(Scheduler));
+
+ var defaultScheduler = Scheduler['default'] = Scheduler.async = new DefaultScheduler();
+
+ function IndexedItem(id, value) {
+ this.id = id;
+ this.value = value;
+ }
- function scheduleNow(state, action) {
- var scheduler = this, disposable = new SingleAssignmentDisposable();
- var id = scheduleMethod(function () {
- !disposable.isDisposed && disposable.setDisposable(action(scheduler, state));
- });
- return new CompositeDisposable(disposable, disposableCreate(function () {
- clearMethod(id);
- }));
- }
+ IndexedItem.prototype.compareTo = function (other) {
+ var c = this.value.compareTo(other.value);
+ c === 0 && (c = this.id - other.id);
+ return c;
+ };
- function scheduleRelative(state, dueTime, action) {
- var scheduler = this, dt = Scheduler.normalize(dueTime), disposable = new SingleAssignmentDisposable();
- if (dt === 0) { return scheduler.scheduleWithState(state, action); }
- var id = localSetTimeout(function () {
- !disposable.isDisposed && disposable.setDisposable(action(scheduler, state));
- }, dt);
- return new CompositeDisposable(disposable, disposableCreate(function () {
- localClearTimeout(id);
- }));
+ var PriorityQueue = Rx.internals.PriorityQueue = function (capacity) {
+ this.items = new Array(capacity);
+ this.length = 0;
+ };
+
+ var priorityProto = PriorityQueue.prototype;
+ priorityProto.isHigherPriority = function (left, right) {
+ return this.items[left].compareTo(this.items[right]) < 0;
+ };
+
+ priorityProto.percolate = function (index) {
+ if (index >= this.length || index < 0) { return; }
+ var parent = index - 1 >> 1;
+ if (parent < 0 || parent === index) { return; }
+ if (this.isHigherPriority(index, parent)) {
+ var temp = this.items[index];
+ this.items[index] = this.items[parent];
+ this.items[parent] = temp;
+ this.percolate(parent);
}
+ };
- function scheduleAbsolute(state, dueTime, action) {
- return this.scheduleWithRelativeAndState(state, dueTime - this.now(), action);
+ priorityProto.heapify = function (index) {
+ +index || (index = 0);
+ if (index >= this.length || index < 0) { return; }
+ var left = 2 * index + 1,
+ right = 2 * index + 2,
+ first = index;
+ if (left < this.length && this.isHigherPriority(left, first)) {
+ first = left;
}
+ if (right < this.length && this.isHigherPriority(right, first)) {
+ first = right;
+ }
+ if (first !== index) {
+ var temp = this.items[index];
+ this.items[index] = this.items[first];
+ this.items[first] = temp;
+ this.heapify(first);
+ }
+ };
- return new Scheduler(defaultNow, scheduleNow, scheduleRelative, scheduleAbsolute);
- })();
+ priorityProto.peek = function () { return this.items[0].value; };
+
+ priorityProto.removeAt = function (index) {
+ this.items[index] = this.items[--this.length];
+ this.items[this.length] = undefined;
+ this.heapify();
+ };
+
+ priorityProto.dequeue = function () {
+ var result = this.peek();
+ this.removeAt(0);
+ return result;
+ };
+
+ priorityProto.enqueue = function (item) {
+ var index = this.length++;
+ this.items[index] = new IndexedItem(PriorityQueue.count++, item);
+ this.percolate(index);
+ };
+
+ priorityProto.remove = function (item) {
+ for (var i = 0; i < this.length; i++) {
+ if (this.items[i].value === item) {
+ this.removeAt(i);
+ return true;
+ }
+ }
+ return false;
+ };
+ PriorityQueue.count = 0;
/**
* Represents a notification to an observer.
*/
var Notification = Rx.Notification = (function () {
- function Notification(kind, value, exception, accept, acceptObservable, toString) {
- this.kind = kind;
- this.value = value;
- this.exception = exception;
- this._accept = accept;
- this._acceptObservable = acceptObservable;
- this.toString = toString;
+ function Notification() {
+
}
+ Notification.prototype._accept = function (onNext, onError, onCompleted) {
+ throw new NotImplementedError();
+ };
+
+ Notification.prototype._acceptObserver = function (onNext, onError, onCompleted) {
+ throw new NotImplementedError();
+ };
+
/**
* Invokes the delegate corresponding to the notification or the observer's method corresponding to the notification and returns the produced result.
- *
- * @memberOf Notification
- * @param {Any} observerOrOnNext Delegate to invoke for an OnNext notification or Observer to invoke the notification on..
- * @param {Function} onError Delegate to invoke for an OnError notification.
- * @param {Function} onCompleted Delegate to invoke for an OnCompleted notification.
+ * @param {Function | Observer} observerOrOnNext Function to invoke for an OnNext notification or Observer to invoke the notification on..
+ * @param {Function} onError Function to invoke for an OnError notification.
+ * @param {Function} onCompleted Function to invoke for an OnCompleted notification.
* @returns {Any} Result produced by the observation.
*/
Notification.prototype.accept = function (observerOrOnNext, onError, onCompleted) {
return observerOrOnNext && typeof observerOrOnNext === 'object' ?
- this._acceptObservable(observerOrOnNext) :
+ this._acceptObserver(observerOrOnNext) :
this._accept(observerOrOnNext, onError, onCompleted);
};
@@ -1368,10 +1454,10 @@
Notification.prototype.toObservable = function (scheduler) {
var self = this;
isScheduler(scheduler) || (scheduler = immediateScheduler);
- return new AnonymousObservable(function (observer) {
- return scheduler.scheduleWithState(self, function (_, notification) {
- notification._acceptObservable(observer);
- notification.kind === 'N' && observer.onCompleted();
+ return new AnonymousObservable(function (o) {
+ return scheduler.schedule(self, function (_, notification) {
+ notification._acceptObserver(o);
+ notification.kind === 'N' && o.onCompleted();
});
});
};
@@ -1379,49 +1465,96 @@
return Notification;
})();
+ var OnNextNotification = (function (__super__) {
+ inherits(OnNextNotification, __super__);
+ function OnNextNotification(value) {
+ this.value = value;
+ this.kind = 'N';
+ }
+
+ OnNextNotification.prototype._accept = function (onNext) {
+ return onNext(this.value);
+ };
+
+ OnNextNotification.prototype._acceptObserver = function (o) {
+ return o.onNext(this.value);
+ };
+
+ OnNextNotification.prototype.toString = function () {
+ return 'OnNext(' + this.value + ')';
+ };
+
+ return OnNextNotification;
+ }(Notification));
+
+ var OnErrorNotification = (function (__super__) {
+ inherits(OnErrorNotification, __super__);
+ function OnErrorNotification(error) {
+ this.error = error;
+ this.kind = 'E';
+ }
+
+ OnErrorNotification.prototype._accept = function (onNext, onError) {
+ return onError(this.error);
+ };
+
+ OnErrorNotification.prototype._acceptObserver = function (o) {
+ return o.onError(this.error);
+ };
+
+ OnErrorNotification.prototype.toString = function () {
+ return 'OnError(' + this.error + ')';
+ };
+
+ return OnErrorNotification;
+ }(Notification));
+
+ var OnCompletedNotification = (function (__super__) {
+ inherits(OnCompletedNotification, __super__);
+ function OnCompletedNotification() {
+ this.kind = 'C';
+ }
+
+ OnCompletedNotification.prototype._accept = function (onNext, onError, onCompleted) {
+ return onCompleted();
+ };
+
+ OnCompletedNotification.prototype._acceptObserver = function (o) {
+ return o.onCompleted();
+ };
+
+ OnCompletedNotification.prototype.toString = function () {
+ return 'OnCompleted()';
+ };
+
+ return OnCompletedNotification;
+ }(Notification));
+
/**
* Creates an object that represents an OnNext notification to an observer.
* @param {Any} value The value contained in the notification.
* @returns {Notification} The OnNext notification containing the value.
*/
- var notificationCreateOnNext = Notification.createOnNext = (function () {
- function _accept(onNext) { return onNext(this.value); }
- function _acceptObservable(observer) { return observer.onNext(this.value); }
- function toString() { return 'OnNext(' + this.value + ')'; }
-
- return function (value) {
- return new Notification('N', value, null, _accept, _acceptObservable, toString);
- };
- }());
+ var notificationCreateOnNext = Notification.createOnNext = function (value) {
+ return new OnNextNotification(value);
+ };
/**
* Creates an object that represents an OnError notification to an observer.
* @param {Any} error The exception contained in the notification.
* @returns {Notification} The OnError notification containing the exception.
*/
- var notificationCreateOnError = Notification.createOnError = (function () {
- function _accept (onNext, onError) { return onError(this.exception); }
- function _acceptObservable(observer) { return observer.onError(this.exception); }
- function toString () { return 'OnError(' + this.exception + ')'; }
-
- return function (e) {
- return new Notification('E', null, e, _accept, _acceptObservable, toString);
- };
- }());
+ var notificationCreateOnError = Notification.createOnError = function (error) {
+ return new OnErrorNotification(error);
+ };
/**
* Creates an object that represents an OnCompleted notification to an observer.
* @returns {Notification} The OnCompleted notification.
*/
- var notificationCreateOnCompleted = Notification.createOnCompleted = (function () {
- function _accept (onNext, onError, onCompleted) { return onCompleted(); }
- function _acceptObservable(observer) { return observer.onCompleted(); }
- function toString () { return 'OnCompleted()'; }
-
- return function () {
- return new Notification('C', null, null, _accept, _acceptObservable, toString);
- };
- }());
+ var notificationCreateOnCompleted = Notification.createOnCompleted = function () {
+ return new OnCompletedNotification();
+ };
/**
* Supports push-style iteration over an observable sequence.
@@ -1572,13 +1705,12 @@
};
}
- function Observable(subscribe) {
+ function Observable() {
if (Rx.config.longStackSupport && hasStacks) {
+ var oldSubscribe = this._subscribe;
var e = tryCatch(thrower)(new Error()).e;
this.stack = e.stack.substring(e.stack.indexOf('\n') + 1);
- this._subscribe = makeSubscribe(this, subscribe);
- } else {
- this._subscribe = subscribe;
+ this._subscribe = makeSubscribe(this, oldSubscribe);
}
}
@@ -1591,7 +1723,7 @@
*/
Observable.isObservable = function (o) {
return o && isFunction(o.subscribe);
- }
+ };
/**
* Subscribes an o to the observable sequence.
@@ -1652,45 +1784,48 @@
this.disposable = new SerialDisposable();
}
- ScheduledObserver.prototype.next = function (value) {
- var self = this;
- this.queue.push(function () { self.observer.onNext(value); });
+ function enqueueNext(observer, x) { return function () { observer.onNext(x); }; }
+ function enqueueError(observer, e) { return function () { observer.onError(e); }; }
+ function enqueueCompleted(observer) { return function () { observer.onCompleted(); }; }
+
+ ScheduledObserver.prototype.next = function (x) {
+ this.queue.push(enqueueNext(this.observer, x));
};
ScheduledObserver.prototype.error = function (e) {
- var self = this;
- this.queue.push(function () { self.observer.onError(e); });
+ this.queue.push(enqueueError(this.observer, e));
};
ScheduledObserver.prototype.completed = function () {
- var self = this;
- this.queue.push(function () { self.observer.onCompleted(); });
+ this.queue.push(enqueueCompleted(this.observer));
};
+
+ function scheduleMethod(state, recurse) {
+ var work;
+ if (state.queue.length > 0) {
+ work = state.queue.shift();
+ } else {
+ state.isAcquired = false;
+ return;
+ }
+ var res = tryCatch(work)();
+ if (res === errorObj) {
+ state.queue = [];
+ state.hasFaulted = true;
+ return thrower(res.e);
+ }
+ recurse(state);
+ }
+
ScheduledObserver.prototype.ensureActive = function () {
var isOwner = false;
if (!this.hasFaulted && this.queue.length > 0) {
isOwner = !this.isAcquired;
this.isAcquired = true;
}
- if (isOwner) {
- this.disposable.setDisposable(this.scheduler.scheduleRecursiveWithState(this, function (parent, self) {
- var work;
- if (parent.queue.length > 0) {
- work = parent.queue.shift();
- } else {
- parent.isAcquired = false;
- return;
- }
- var res = tryCatch(work)();
- if (res === errorObj) {
- parent.queue = [];
- parent.hasFaulted = true;
- return thrower(res.e);
- }
- self(parent);
- }));
- }
+ isOwner &&
+ this.disposable.setDisposable(this.scheduler.scheduleRecursive(this, scheduleMethod));
};
ScheduledObserver.prototype.dispose = function () {
@@ -1712,92 +1847,74 @@
function setDisposable(s, state) {
var ado = state[0], self = state[1];
var sub = tryCatch(self.subscribeCore).call(self, ado);
-
- if (sub === errorObj) {
- if(!ado.fail(errorObj.e)) { return thrower(errorObj.e); }
- }
+ if (sub === errorObj && !ado.fail(errorObj.e)) { thrower(errorObj.e); }
ado.setDisposable(fixSubscriber(sub));
}
- function subscribe(observer) {
- var ado = new AutoDetachObserver(observer), state = [ado, this];
+ function ObservableBase() {
+ __super__.call(this);
+ }
+
+ ObservableBase.prototype._subscribe = function (o) {
+ var ado = new AutoDetachObserver(o), state = [ado, this];
if (currentThreadScheduler.scheduleRequired()) {
- currentThreadScheduler.scheduleWithState(state, setDisposable);
+ currentThreadScheduler.schedule(state, setDisposable);
} else {
setDisposable(null, state);
}
return ado;
- }
-
- function ObservableBase() {
- __super__.call(this, subscribe);
- }
+ };
ObservableBase.prototype.subscribeCore = notImplemented;
return ObservableBase;
}(Observable));
-var FlatMapObservable = (function(__super__){
+var FlatMapObservable = Rx.FlatMapObservable = (function(__super__) {
inherits(FlatMapObservable, __super__);
function FlatMapObservable(source, selector, resultSelector, thisArg) {
- this.resultSelector = Rx.helpers.isFunction(resultSelector) ?
- resultSelector : null;
-
- this.selector = Rx.internals.bindCallback(Rx.helpers.isFunction(selector) ? selector : function() { return selector; }, thisArg, 3);
- this.source = source;
-
- __super__.call(this);
-
+ this.resultSelector = isFunction(resultSelector) ? resultSelector : null;
+ this.selector = bindCallback(isFunction(selector) ? selector : function() { return selector; }, thisArg, 3);
+ this.source = source;
+ __super__.call(this);
}
FlatMapObservable.prototype.subscribeCore = function(o) {
- return this.source.subscribe(new InnerObserver(o, this.selector, this.resultSelector, this));
+ return this.source.subscribe(new InnerObserver(o, this.selector, this.resultSelector, this));
};
+ inherits(InnerObserver, AbstractObserver);
function InnerObserver(observer, selector, resultSelector, source) {
- this.i = 0;
- this.selector = selector;
- this.resultSelector = resultSelector;
- this.source = source;
- this.isStopped = false;
- this.o = observer;
+ this.i = 0;
+ this.selector = selector;
+ this.resultSelector = resultSelector;
+ this.source = source;
+ this.o = observer;
+ AbstractObserver.call(this);
}
InnerObserver.prototype._wrapResult = function(result, x, i) {
- return this.resultSelector ?
- result.map(function(y, i2) { return this.resultSelector(x, y, i, i2); }, this) :
- result;
+ return this.resultSelector ?
+ result.map(function(y, i2) { return this.resultSelector(x, y, i, i2); }, this) :
+ result;
};
- InnerObserver.prototype.onNext = function(x) {
-
- if (this.isStopped) return;
-
- var i = this.i++;
- var result = tryCatch(this.selector)(x, i, this.source);
-
- if (result === errorObj) {
- return this.o.onError(result.e);
- }
-
- Rx.helpers.isPromise(result) && (result = Rx.Observable.fromPromise(result));
- (Rx.helpers.isArrayLike(result) || Rx.helpers.isIterable(result)) && (result = Rx.Observable.from(result));
-
- this.o.onNext(this._wrapResult(result, x, i));
+ InnerObserver.prototype.next = function(x) {
+ var i = this.i++;
+ var result = tryCatch(this.selector)(x, i, this.source);
+ if (result === errorObj) { return this.o.onError(result.e); }
+ isPromise(result) && (result = observableFromPromise(result));
+ (isArrayLike(result) || isIterable(result)) && (result = Observable.from(result));
+ this.o.onNext(this._wrapResult(result, x, i));
};
- InnerObserver.prototype.onError = function(e) {
- if(!this.isStopped) { this.isStopped = true; this.o.onError(e); }
- };
+ InnerObserver.prototype.error = function(e) { this.o.onError(e); };
- InnerObserver.prototype.onCompleted = function() {
- if (!this.isStopped) {this.isStopped = true; this.o.onCompleted(); }
- };
+ InnerObserver.prototype.completed = function() { this.o.onCompleted(); };
return FlatMapObservable;
@@ -1805,199 +1922,156 @@ var FlatMapObservable = (function(__super__){
var Enumerable = Rx.internals.Enumerable = function () { };
+ function IsDisposedDisposable(state) {
+ this._s = state;
+ this.isDisposed = false;
+ }
+
+ IsDisposedDisposable.prototype.dispose = function () {
+ if (!this.isDisposed) {
+ this.isDisposed = true;
+ this._s.isDisposed = true;
+ }
+ };
+
var ConcatEnumerableObservable = (function(__super__) {
inherits(ConcatEnumerableObservable, __super__);
function ConcatEnumerableObservable(sources) {
this.sources = sources;
__super__.call(this);
}
-
- ConcatEnumerableObservable.prototype.subscribeCore = function (o) {
- var isDisposed, subscription = new SerialDisposable();
- var cancelable = immediateScheduler.scheduleRecursiveWithState(this.sources[$iterator$](), function (e, self) {
- if (isDisposed) { return; }
- var currentItem = tryCatch(e.next).call(e);
- if (currentItem === errorObj) { return o.onError(currentItem.e); }
- if (currentItem.done) {
- return o.onCompleted();
- }
+ function scheduleMethod(state, recurse) {
+ if (state.isDisposed) { return; }
+ var currentItem = tryCatch(state.e.next).call(state.e);
+ if (currentItem === errorObj) { return state.o.onError(currentItem.e); }
+ if (currentItem.done) { return state.o.onCompleted(); }
- // Check if promise
- var currentValue = currentItem.value;
- isPromise(currentValue) && (currentValue = observableFromPromise(currentValue));
+ // Check if promise
+ var currentValue = currentItem.value;
+ isPromise(currentValue) && (currentValue = observableFromPromise(currentValue));
- var d = new SingleAssignmentDisposable();
- subscription.setDisposable(d);
- d.setDisposable(currentValue.subscribe(new InnerObserver(o, self, e)));
- });
+ var d = new SingleAssignmentDisposable();
+ state.subscription.setDisposable(d);
+ d.setDisposable(currentValue.subscribe(new InnerObserver(state, recurse)));
+ }
- return new CompositeDisposable(subscription, cancelable, disposableCreate(function () {
- isDisposed = true;
- }));
+ ConcatEnumerableObservable.prototype.subscribeCore = function (o) {
+ var subscription = new SerialDisposable();
+ var state = {
+ isDisposed: false,
+ o: o,
+ subscription: subscription,
+ e: this.sources[$iterator$]()
+ };
+
+ var cancelable = currentThreadScheduler.scheduleRecursive(state, scheduleMethod);
+ return new NAryDisposable([subscription, cancelable, new IsDisposedDisposable(state)]);
};
-
- function InnerObserver(o, s, e) {
- this.o = o;
- this.s = s;
- this.e = e;
- this.isStopped = false;
+
+ function InnerObserver(state, recurse) {
+ this._state = state;
+ this._recurse = recurse;
+ AbstractObserver.call(this);
}
- InnerObserver.prototype.onNext = function (x) { if(!this.isStopped) { this.o.onNext(x); } };
- InnerObserver.prototype.onError = function (err) {
- if (!this.isStopped) {
- this.isStopped = true;
- this.o.onError(err);
- }
- };
- InnerObserver.prototype.onCompleted = function () {
- if (!this.isStopped) {
- this.isStopped = true;
- this.s(this.e);
- }
- };
- InnerObserver.prototype.dispose = function () { this.isStopped = true; };
- InnerObserver.prototype.fail = function (err) {
- if (!this.isStopped) {
- this.isStopped = true;
- this.o.onError(err);
- return true;
- }
- return false;
- };
-
+
+ inherits(InnerObserver, AbstractObserver);
+
+ InnerObserver.prototype.next = function (x) { this._state.o.onNext(x); };
+ InnerObserver.prototype.error = function (e) { this._state.o.onError(e); };
+ InnerObserver.prototype.completed = function () { this._recurse(this._state); };
+
return ConcatEnumerableObservable;
}(ObservableBase));
Enumerable.prototype.concat = function () {
return new ConcatEnumerableObservable(this);
};
-
+
var CatchErrorObservable = (function(__super__) {
- inherits(CatchErrorObservable, __super__);
function CatchErrorObservable(sources) {
this.sources = sources;
__super__.call(this);
}
-
- CatchErrorObservable.prototype.subscribeCore = function (o) {
- var e = this.sources[$iterator$]();
- var isDisposed, subscription = new SerialDisposable();
- var cancelable = immediateScheduler.scheduleRecursiveWithState(null, function (lastException, self) {
- if (isDisposed) { return; }
- var currentItem = tryCatch(e.next).call(e);
- if (currentItem === errorObj) { return o.onError(currentItem.e); }
+ inherits(CatchErrorObservable, __super__);
- if (currentItem.done) {
- return lastException !== null ? o.onError(lastException) : o.onCompleted();
- }
+ function scheduleMethod(state, recurse) {
+ if (state.isDisposed) { return; }
+ var currentItem = tryCatch(state.e.next).call(state.e);
+ if (currentItem === errorObj) { return state.o.onError(currentItem.e); }
+ if (currentItem.done) { return state.lastError !== null ? state.o.onError(state.lastError) : state.o.onCompleted(); }
- // Check if promise
- var currentValue = currentItem.value;
- isPromise(currentValue) && (currentValue = observableFromPromise(currentValue));
+ var currentValue = currentItem.value;
+ isPromise(currentValue) && (currentValue = observableFromPromise(currentValue));
- var d = new SingleAssignmentDisposable();
- subscription.setDisposable(d);
- d.setDisposable(currentValue.subscribe(
- function(x) { o.onNext(x); },
- self,
- function() { o.onCompleted(); }));
- });
- return new CompositeDisposable(subscription, cancelable, disposableCreate(function () {
- isDisposed = true;
- }));
- };
-
- return CatchErrorObservable;
- }(ObservableBase));
-
- Enumerable.prototype.catchError = function () {
- return new CatchErrorObservable(this);
- };
-
- Enumerable.prototype.catchErrorWhen = function (notificationHandler) {
- var sources = this;
- return new AnonymousObservable(function (o) {
- var exceptions = new Subject(),
- notifier = new Subject(),
- handled = notificationHandler(exceptions),
- notificationDisposable = handled.subscribe(notifier);
+ var d = new SingleAssignmentDisposable();
+ state.subscription.setDisposable(d);
+ d.setDisposable(currentValue.subscribe(new InnerObserver(state, recurse)));
+ }
- var e = sources[$iterator$]();
+ CatchErrorObservable.prototype.subscribeCore = function (o) {
+ var subscription = new SerialDisposable();
+ var state = {
+ isDisposed: false,
+ e: this.sources[$iterator$](),
+ subscription: subscription,
+ lastError: null,
+ o: o
+ };
- var isDisposed,
- lastException,
- subscription = new SerialDisposable();
- var cancelable = immediateScheduler.scheduleRecursive(function (self) {
- if (isDisposed) { return; }
- var currentItem = tryCatch(e.next).call(e);
- if (currentItem === errorObj) { return o.onError(currentItem.e); }
+ var cancelable = currentThreadScheduler.scheduleRecursive(state, scheduleMethod);
+ return new NAryDisposable([subscription, cancelable, new IsDisposedDisposable(state)]);
+ };
- if (currentItem.done) {
- if (lastException) {
- o.onError(lastException);
- } else {
- o.onCompleted();
- }
- return;
- }
+ function InnerObserver(state, recurse) {
+ this._state = state;
+ this._recurse = recurse;
+ AbstractObserver.call(this);
+ }
- // Check if promise
- var currentValue = currentItem.value;
- isPromise(currentValue) && (currentValue = observableFromPromise(currentValue));
+ inherits(InnerObserver, AbstractObserver);
- var outer = new SingleAssignmentDisposable();
- var inner = new SingleAssignmentDisposable();
- subscription.setDisposable(new CompositeDisposable(inner, outer));
- outer.setDisposable(currentValue.subscribe(
- function(x) { o.onNext(x); },
- function (exn) {
- inner.setDisposable(notifier.subscribe(self, function(ex) {
- o.onError(ex);
- }, function() {
- o.onCompleted();
- }));
+ InnerObserver.prototype.next = function (x) { this._state.o.onNext(x); };
+ InnerObserver.prototype.error = function (e) { this._state.lastError = e; this._recurse(this._state); };
+ InnerObserver.prototype.completed = function () { this._state.o.onCompleted(); };
- exceptions.onNext(exn);
- },
- function() { o.onCompleted(); }));
- });
+ return CatchErrorObservable;
+ }(ObservableBase));
- return new CompositeDisposable(notificationDisposable, subscription, cancelable, disposableCreate(function () {
- isDisposed = true;
- }));
- });
+ Enumerable.prototype.catchError = function () {
+ return new CatchErrorObservable(this);
};
-
+
var RepeatEnumerable = (function (__super__) {
inherits(RepeatEnumerable, __super__);
-
function RepeatEnumerable(v, c) {
this.v = v;
this.c = c == null ? -1 : c;
}
+
RepeatEnumerable.prototype[$iterator$] = function () {
- return new RepeatEnumerator(this);
+ return new RepeatEnumerator(this);
};
-
+
function RepeatEnumerator(p) {
this.v = p.v;
this.l = p.c;
}
+
RepeatEnumerator.prototype.next = function () {
if (this.l === 0) { return doneEnumerator; }
if (this.l > 0) { this.l--; }
- return { done: false, value: this.v };
+ return { done: false, value: this.v };
};
-
+
return RepeatEnumerable;
}(Enumerable));
var enumerableRepeat = Enumerable.repeat = function (value, repeatCount) {
return new RepeatEnumerable(value, repeatCount);
};
-
+
var OfEnumerable = (function(__super__) {
inherits(OfEnumerable, __super__);
function OfEnumerable(s, fn, thisArg) {
@@ -2007,19 +2081,20 @@ var FlatMapObservable = (function(__super__){
OfEnumerable.prototype[$iterator$] = function () {
return new OfEnumerator(this);
};
-
+
function OfEnumerator(p) {
this.i = -1;
this.s = p.s;
this.l = this.s.length;
this.fn = p.fn;
}
+
OfEnumerator.prototype.next = function () {
return ++this.i < this.l ?
{ done: false, value: !this.fn ? this.s[this.i] : this.fn(this.s[this.i], this.i, this.s) } :
- doneEnumerator;
+ doneEnumerator;
};
-
+
return OfEnumerable;
}(Enumerable));
@@ -2038,35 +2113,16 @@ var FlatMapObservable = (function(__super__){
return this.source.subscribe(new InnerObserver(o));
};
+ inherits(InnerObserver, AbstractObserver);
function InnerObserver(o) {
this.o = o;
this.a = [];
- this.isStopped = false;
+ AbstractObserver.call(this);
}
- InnerObserver.prototype.onNext = function (x) { if(!this.isStopped) { this.a.push(x); } };
- InnerObserver.prototype.onError = function (e) {
- if (!this.isStopped) {
- this.isStopped = true;
- this.o.onError(e);
- }
- };
- InnerObserver.prototype.onCompleted = function () {
- if (!this.isStopped) {
- this.isStopped = true;
- this.o.onNext(this.a);
- this.o.onCompleted();
- }
- };
- InnerObserver.prototype.dispose = function () { this.isStopped = true; }
- InnerObserver.prototype.fail = function (e) {
- if (!this.isStopped) {
- this.isStopped = true;
- this.o.onError(e);
- return true;
- }
-
- return false;
- };
+
+ InnerObserver.prototype.next = function (x) { this.a.push(x); };
+ InnerObserver.prototype.error = function (e) { this.o.onError(e); };
+ InnerObserver.prototype.completed = function () { this.o.onNext(this.a); this.o.onCompleted(); };
return ToArrayObservable;
}(ObservableBase));
@@ -2092,6 +2148,23 @@ var FlatMapObservable = (function(__super__){
return new AnonymousObservable(subscribe, parent);
};
+ var Defer = (function(__super__) {
+ inherits(Defer, __super__);
+ function Defer(factory) {
+ this._f = factory;
+ __super__.call(this);
+ }
+
+ Defer.prototype.subscribeCore = function (o) {
+ var result = tryCatch(this._f)();
+ if (result === errorObj) { return observableThrow(result.e).subscribe(o);}
+ isPromise(result) && (result = observableFromPromise(result));
+ return result.subscribe(o);
+ };
+
+ return Defer;
+ }(ObservableBase));
+
/**
* Returns an observable sequence that invokes the specified factory function whenever a new observer subscribes.
*
@@ -2101,16 +2174,7 @@ var FlatMapObservable = (function(__super__){
* @returns {Observable} An observable sequence whose observers trigger an invocation of the given observable factory function.
*/
var observableDefer = Observable.defer = function (observableFactory) {
- return new AnonymousObservable(function (observer) {
- var result;
- try {
- result = observableFactory();
- } catch (e) {
- return observableThrow(e).subscribe(observer);
- }
- isPromise(result) && (result = observableFromPromise(result));
- return result.subscribe(observer);
- });
+ return new Defer(observableFactory);
};
var EmptyObservable = (function(__super__) {
@@ -2136,7 +2200,10 @@ var FlatMapObservable = (function(__super__){
}
EmptySink.prototype.run = function () {
- return this.scheduler.scheduleWithState(this.observer, scheduleItem);
+ var state = this.observer;
+ return this.scheduler === immediateScheduler ?
+ scheduleItem(null, state) :
+ this.scheduler.schedule(state, scheduleItem);
};
return EmptyObservable;
@@ -2160,54 +2227,40 @@ var FlatMapObservable = (function(__super__){
var FromObservable = (function(__super__) {
inherits(FromObservable, __super__);
- function FromObservable(iterable, mapper, scheduler) {
- this.iterable = iterable;
- this.mapper = mapper;
- this.scheduler = scheduler;
+ function FromObservable(iterable, fn, scheduler) {
+ this._iterable = iterable;
+ this._fn = fn;
+ this._scheduler = scheduler;
__super__.call(this);
}
- FromObservable.prototype.subscribeCore = function (o) {
- var sink = new FromSink(o, this);
- return sink.run();
- };
-
- return FromObservable;
- }(ObservableBase));
-
- var FromSink = (function () {
- function FromSink(o, parent) {
- this.o = o;
- this.parent = parent;
- }
-
- FromSink.prototype.run = function () {
- var list = Object(this.parent.iterable),
- it = getIterable(list),
- o = this.o,
- mapper = this.parent.mapper;
-
- function loopRecursive(i, recurse) {
+ function createScheduleMethod(o, it, fn) {
+ return function loopRecursive(i, recurse) {
var next = tryCatch(it.next).call(it);
if (next === errorObj) { return o.onError(next.e); }
if (next.done) { return o.onCompleted(); }
var result = next.value;
- if (isFunction(mapper)) {
- result = tryCatch(mapper)(result, i);
+ if (isFunction(fn)) {
+ result = tryCatch(fn)(result, i);
if (result === errorObj) { return o.onError(result.e); }
}
o.onNext(result);
recurse(i + 1);
- }
+ };
+ }
+
+ FromObservable.prototype.subscribeCore = function (o) {
+ var list = Object(this._iterable),
+ it = getIterable(list);
- return this.parent.scheduler.scheduleRecursiveWithState(0, loopRecursive);
+ return this._scheduler.scheduleRecursive(0, createScheduleMethod(o, it, this._fn));
};
- return FromSink;
- }());
+ return FromObservable;
+ }(ObservableBase));
var maxSafeInteger = Math.pow(2, 53) - 1;
@@ -2318,38 +2371,30 @@ var FlatMapObservable = (function(__super__){
var FromArrayObservable = (function(__super__) {
inherits(FromArrayObservable, __super__);
function FromArrayObservable(args, scheduler) {
- this.args = args;
- this.scheduler = scheduler;
+ this._args = args;
+ this._scheduler = scheduler;
__super__.call(this);
}
- FromArrayObservable.prototype.subscribeCore = function (observer) {
- var sink = new FromArraySink(observer, this);
- return sink.run();
+ function scheduleMethod(o, args) {
+ var len = args.length;
+ return function loopRecursive (i, recurse) {
+ if (i < len) {
+ o.onNext(args[i]);
+ recurse(i + 1);
+ } else {
+ o.onCompleted();
+ }
+ };
+ }
+
+ FromArrayObservable.prototype.subscribeCore = function (o) {
+ return this._scheduler.scheduleRecursive(0, scheduleMethod(o, this._args));
};
return FromArrayObservable;
}(ObservableBase));
- function FromArraySink(observer, parent) {
- this.observer = observer;
- this.parent = parent;
- }
-
- FromArraySink.prototype.run = function () {
- var observer = this.observer, args = this.parent.args, len = args.length;
- function loopRecursive(i, recurse) {
- if (i < len) {
- observer.onNext(args[i]);
- recurse(i + 1);
- } else {
- observer.onCompleted();
- }
- }
-
- return this.parent.scheduler.scheduleRecursiveWithState(0, loopRecursive);
- };
-
/**
* Converts an array to an observable sequence, using an optional scheduler to enumerate the array.
* @deprecated use Observable.from or Observable.of
@@ -2412,41 +2457,32 @@ var FlatMapObservable = (function(__super__){
var PairsObservable = (function(__super__) {
inherits(PairsObservable, __super__);
- function PairsObservable(obj, scheduler) {
- this.obj = obj;
- this.keys = Object.keys(obj);
- this.scheduler = scheduler;
+ function PairsObservable(o, scheduler) {
+ this._o = o;
+ this._keys = Object.keys(o);
+ this._scheduler = scheduler;
__super__.call(this);
}
- PairsObservable.prototype.subscribeCore = function (observer) {
- var sink = new PairsSink(observer, this);
- return sink.run();
+ function scheduleMethod(o, obj, keys) {
+ return function loopRecursive(i, recurse) {
+ if (i < keys.length) {
+ var key = keys[i];
+ o.onNext([key, obj[key]]);
+ recurse(i + 1);
+ } else {
+ o.onCompleted();
+ }
+ };
+ }
+
+ PairsObservable.prototype.subscribeCore = function (o) {
+ return this._scheduler.scheduleRecursive(0, scheduleMethod(o, this._o, this._keys));
};
return PairsObservable;
}(ObservableBase));
- function PairsSink(observer, parent) {
- this.observer = observer;
- this.parent = parent;
- }
-
- PairsSink.prototype.run = function () {
- var observer = this.observer, obj = this.parent.obj, keys = this.parent.keys, len = keys.length;
- function loopRecursive(i, recurse) {
- if (i < len) {
- var key = keys[i];
- observer.onNext([key, obj[key]]);
- recurse(i + 1);
- } else {
- observer.onCompleted();
- }
- }
-
- return this.parent.scheduler.scheduleRecursiveWithState(0, loopRecursive);
- };
-
/**
* Convert an object into an observable sequence of [key, value] pairs.
* @param {Object} obj The object to inspect.
@@ -2467,36 +2503,26 @@ var FlatMapObservable = (function(__super__){
__super__.call(this);
}
- RangeObservable.prototype.subscribeCore = function (observer) {
- var sink = new RangeSink(observer, this);
- return sink.run();
- };
-
- return RangeObservable;
- }(ObservableBase));
-
- var RangeSink = (function () {
- function RangeSink(observer, parent) {
- this.observer = observer;
- this.parent = parent;
- }
-
- RangeSink.prototype.run = function () {
- var start = this.parent.start, count = this.parent.rangeCount, observer = this.observer;
- function loopRecursive(i, recurse) {
+ function loopRecursive(start, count, o) {
+ return function loop (i, recurse) {
if (i < count) {
- observer.onNext(start + i);
+ o.onNext(start + i);
recurse(i + 1);
} else {
- observer.onCompleted();
+ o.onCompleted();
}
- }
+ };
+ }
- return this.parent.scheduler.scheduleRecursiveWithState(0, loopRecursive);
+ RangeObservable.prototype.subscribeCore = function (o) {
+ return this.scheduler.scheduleRecursive(
+ 0,
+ loopRecursive(this.start, this.rangeCount, o)
+ );
};
- return RangeSink;
- }());
+ return RangeObservable;
+ }(ObservableBase));
/**
* Generates an observable sequence of integral numbers within a specified range, using the specified scheduler to send out observer messages.
@@ -2543,7 +2569,7 @@ var FlatMapObservable = (function(__super__){
recurse(i);
}
- return this.parent.scheduler.scheduleRecursiveWithState(this.parent.repeatCount, loopRecursive);
+ return this.parent.scheduler.scheduleRecursive(this.parent.repeatCount, loopRecursive);
};
/**
@@ -2561,22 +2587,18 @@ var FlatMapObservable = (function(__super__){
var JustObservable = (function(__super__) {
inherits(JustObservable, __super__);
function JustObservable(value, scheduler) {
- this.value = value;
- this.scheduler = scheduler;
+ this._value = value;
+ this._scheduler = scheduler;
__super__.call(this);
}
- JustObservable.prototype.subscribeCore = function (observer) {
- var sink = new JustSink(observer, this.value, this.scheduler);
- return sink.run();
+ JustObservable.prototype.subscribeCore = function (o) {
+ var state = [this._value, o];
+ return this._scheduler === immediateScheduler ?
+ scheduleItem(null, state) :
+ this._scheduler.schedule(state, scheduleItem);
};
- function JustSink(observer, value, scheduler) {
- this.observer = observer;
- this.value = value;
- this.scheduler = scheduler;
- }
-
function scheduleItem(s, state) {
var value = state[0], observer = state[1];
observer.onNext(value);
@@ -2584,13 +2606,6 @@ var FlatMapObservable = (function(__super__){
return disposableEmpty;
}
- JustSink.prototype.run = function () {
- var state = [this.value, this.observer];
- return this.scheduler === immediateScheduler ?
- scheduleItem(null, state) :
- this.scheduler.scheduleWithState(state, scheduleItem);
- };
-
return JustObservable;
}(ObservableBase));
@@ -2609,30 +2624,24 @@ var FlatMapObservable = (function(__super__){
var ThrowObservable = (function(__super__) {
inherits(ThrowObservable, __super__);
function ThrowObservable(error, scheduler) {
- this.error = error;
- this.scheduler = scheduler;
+ this._error = error;
+ this._scheduler = scheduler;
__super__.call(this);
}
ThrowObservable.prototype.subscribeCore = function (o) {
- var sink = new ThrowSink(o, this);
- return sink.run();
+ var state = [this._error, o];
+ return this._scheduler === immediateScheduler ?
+ scheduleItem(null, state) :
+ this._scheduler.schedule(state, scheduleItem);
};
- function ThrowSink(o, p) {
- this.o = o;
- this.p = p;
- }
-
function scheduleItem(s, state) {
var e = state[0], o = state[1];
o.onError(e);
+ return disposableEmpty;
}
- ThrowSink.prototype.run = function () {
- return this.p.scheduler.scheduleWithState([this.p.error, this.o], scheduleItem);
- };
-
return ThrowObservable;
}(ObservableBase));
@@ -2648,6 +2657,24 @@ var FlatMapObservable = (function(__super__){
return new ThrowObservable(error, scheduler);
};
+ var CatchObservable = (function (__super__) {
+ inherits(CatchObservable, __super__);
+ function CatchObservable(source, fn) {
+ this.source = source;
+ this._fn = fn;
+ __super__.call(this);
+ }
+
+ CatchObservable.prototype.subscribeCore = function (o) {
+ var d1 = new SingleAssignmentDisposable(), subscription = new SerialDisposable();
+ subscription.setDisposable(d1);
+ d1.setDisposable(this.source.subscribe(new CatchObserver(o, subscription, this._fn)));
+ return subscription;
+ };
+
+ return CatchObservable;
+ }(ObservableBase));
+
var CatchObserver = (function(__super__) {
inherits(CatchObserver, __super__);
function CatchObserver(o, s, fn) {
@@ -2672,22 +2699,13 @@ var FlatMapObservable = (function(__super__){
return CatchObserver;
}(AbstractObserver));
- function observableCatchHandler(source, handler) {
- return new AnonymousObservable(function (o) {
- var d1 = new SingleAssignmentDisposable(), subscription = new SerialDisposable();
- subscription.setDisposable(d1);
- d1.setDisposable(source.subscribe(new CatchObserver(o, subscription, handler)));
- return subscription;
- }, source);
- }
-
/**
* Continues an observable sequence that is terminated by an exception with the next observable sequence.
* @param {Mixed} handlerOrSecond Exception handler function that returns an observable sequence given the error that occurred in the first sequence, or a second observable sequence used to produce results when an error occurred in the first sequence.
* @returns {Observable} An observable sequence containing the first sequence's elements, followed by the elements of the handler sequence in case an exception occurred.
*/
observableProto['catch'] = function (handlerOrSecond) {
- return isFunction(handlerOrSecond) ? observableCatchHandler(this, handlerOrSecond) : observableCatch([this, handlerOrSecond]);
+ return isFunction(handlerOrSecond) ? new CatchObservable(this, handlerOrSecond) : observableCatch([this, handlerOrSecond]);
};
/**
@@ -2734,64 +2752,92 @@ var FlatMapObservable = (function(__super__){
return args;
}
- /**
- * Merges the specified observable sequences into one observable sequence by using the selector function whenever any of the observable sequences or Promises produces an element.
- *
- * @example
- * 1 - obs = Rx.Observable.combineLatest(obs1, obs2, obs3, function (o1, o2, o3) { return o1 + o2 + o3; });
- * 2 - obs = Rx.Observable.combineLatest([obs1, obs2, obs3], function (o1, o2, o3) { return o1 + o2 + o3; });
- * @returns {Observable} An observable sequence containing the result of combining elements of the sources using the specified result selector function.
- */
- var combineLatest = Observable.combineLatest = function () {
- var len = arguments.length, args = new Array(len);
- for(var i = 0; i < len; i++) { args[i] = arguments[i]; }
- var resultSelector = isFunction(args[len - 1]) ? args.pop() : argumentsToArray;
- Array.isArray(args[0]) && (args = args[0]);
+ var CombineLatestObservable = (function(__super__) {
+ inherits(CombineLatestObservable, __super__);
+ function CombineLatestObservable(params, cb) {
+ this._params = params;
+ this._cb = cb;
+ __super__.call(this);
+ }
- return new AnonymousObservable(function (o) {
- var n = args.length,
- hasValue = arrayInitialize(n, falseFactory),
- hasValueAll = false,
- isDone = arrayInitialize(n, falseFactory),
- values = new Array(n);
+ CombineLatestObservable.prototype.subscribeCore = function(observer) {
+ var len = this._params.length,
+ subscriptions = new Array(len);
- function next(i) {
- hasValue[i] = true;
- if (hasValueAll || (hasValueAll = hasValue.every(identity))) {
- try {
- var res = resultSelector.apply(null, values);
- } catch (e) {
- return o.onError(e);
- }
- o.onNext(res);
- } else if (isDone.filter(function (x, j) { return j !== i; }).every(identity)) {
- o.onCompleted();
- }
- }
+ var state = {
+ hasValue: arrayInitialize(len, falseFactory),
+ hasValueAll: false,
+ isDone: arrayInitialize(len, falseFactory),
+ values: new Array(len)
+ };
- function done (i) {
- isDone[i] = true;
- isDone.every(identity) && o.onCompleted();
+ for (var i = 0; i < len; i++) {
+ var source = this._params[i], sad = new SingleAssignmentDisposable();
+ subscriptions[i] = sad;
+ isPromise(source) && (source = observableFromPromise(source));
+ sad.setDisposable(source.subscribe(new CombineLatestObserver(observer, i, this._cb, state)));
}
- var subscriptions = new Array(n);
- for (var idx = 0; idx < n; idx++) {
- (function (i) {
- var source = args[i], sad = new SingleAssignmentDisposable();
- isPromise(source) && (source = observableFromPromise(source));
- sad.setDisposable(source.subscribe(function (x) {
- values[i] = x;
- next(i);
- },
- function(e) { o.onError(e); },
- function () { done(i); }
- ));
- subscriptions[i] = sad;
- }(idx));
+ return new NAryDisposable(subscriptions);
+ };
+
+ return CombineLatestObservable;
+ }(ObservableBase));
+
+ var CombineLatestObserver = (function (__super__) {
+ inherits(CombineLatestObserver, __super__);
+ function CombineLatestObserver(o, i, cb, state) {
+ this._o = o;
+ this._i = i;
+ this._cb = cb;
+ this._state = state;
+ __super__.call(this);
+ }
+
+ function notTheSame(i) {
+ return function (x, j) {
+ return j !== i;
+ };
+ }
+
+ CombineLatestObserver.prototype.next = function (x) {
+ this._state.values[this._i] = x;
+ this._state.hasValue[this._i] = true;
+ if (this._state.hasValueAll || (this._state.hasValueAll = this._state.hasValue.every(identity))) {
+ var res = tryCatch(this._cb).apply(null, this._state.values);
+ if (res === errorObj) { return this._o.onError(res.e); }
+ this._o.onNext(res);
+ } else if (this._state.isDone.filter(notTheSame(this._i)).every(identity)) {
+ this._o.onCompleted();
}
+ };
- return new CompositeDisposable(subscriptions);
- }, this);
+ CombineLatestObserver.prototype.error = function (e) {
+ this._o.onError(e);
+ };
+
+ CombineLatestObserver.prototype.completed = function () {
+ this._state.isDone[this._i] = true;
+ this._state.isDone.every(identity) && this._o.onCompleted();
+ };
+
+ return CombineLatestObserver;
+ }(AbstractObserver));
+
+ /**
+ * Merges the specified observable sequences into one observable sequence by using the selector function whenever any of the observable sequences or Promises produces an element.
+ *
+ * @example
+ * 1 - obs = Rx.Observable.combineLatest(obs1, obs2, obs3, function (o1, o2, o3) { return o1 + o2 + o3; });
+ * 2 - obs = Rx.Observable.combineLatest([obs1, obs2, obs3], function (o1, o2, o3) { return o1 + o2 + o3; });
+ * @returns {Observable} An observable sequence containing the result of combining elements of the sources using the specified result selector function.
+ */
+ var combineLatest = Observable.combineLatest = function () {
+ var len = arguments.length, args = new Array(len);
+ for(var i = 0; i < len; i++) { args[i] = arguments[i]; }
+ var resultSelector = isFunction(args[len - 1]) ? args.pop() : argumentsToArray;
+ Array.isArray(args[0]) && (args = args[0]);
+ return new CombineLatestObservable(args, resultSelector);
};
/**
@@ -2804,49 +2850,56 @@ var FlatMapObservable = (function(__super__){
return observableConcat.apply(null, args);
};
+ var ConcatObserver = (function(__super__) {
+ inherits(ConcatObserver, __super__);
+ function ConcatObserver(s, fn) {
+ this._s = s;
+ this._fn = fn;
+ __super__.call(this);
+ }
+
+ ConcatObserver.prototype.next = function (x) { this._s.o.onNext(x); };
+ ConcatObserver.prototype.error = function (e) { this._s.o.onError(e); };
+ ConcatObserver.prototype.completed = function () { this._s.i++; this._fn(this._s); };
+
+ return ConcatObserver;
+ }(AbstractObserver));
+
var ConcatObservable = (function(__super__) {
inherits(ConcatObservable, __super__);
function ConcatObservable(sources) {
- this.sources = sources;
+ this._sources = sources;
__super__.call(this);
}
- ConcatObservable.prototype.subscribeCore = function(o) {
- var sink = new ConcatSink(this.sources, o);
- return sink.run();
- };
+ function scheduleRecursive (state, recurse) {
+ if (state.disposable.isDisposed) { return; }
+ if (state.i === state.sources.length) { return state.o.onCompleted(); }
- function ConcatSink(sources, o) {
- this.sources = sources;
- this.o = o;
- }
- ConcatSink.prototype.run = function () {
- var isDisposed, subscription = new SerialDisposable(), sources = this.sources, length = sources.length, o = this.o;
- var cancelable = immediateScheduler.scheduleRecursiveWithState(0, function (i, self) {
- if (isDisposed) { return; }
- if (i === length) {
- return o.onCompleted();
- }
+ // Check if promise
+ var currentValue = state.sources[state.i];
+ isPromise(currentValue) && (currentValue = observableFromPromise(currentValue));
- // Check if promise
- var currentValue = sources[i];
- isPromise(currentValue) && (currentValue = observableFromPromise(currentValue));
+ var d = new SingleAssignmentDisposable();
+ state.subscription.setDisposable(d);
+ d.setDisposable(currentValue.subscribe(new ConcatObserver(state, recurse)));
+ }
- var d = new SingleAssignmentDisposable();
- subscription.setDisposable(d);
- d.setDisposable(currentValue.subscribe(
- function (x) { o.onNext(x); },
- function (e) { o.onError(e); },
- function () { self(i + 1); }
- ));
- });
+ ConcatObservable.prototype.subscribeCore = function(o) {
+ var subscription = new SerialDisposable();
+ var disposable = disposableCreate(noop);
+ var state = {
+ o: o,
+ i: 0,
+ subscription: subscription,
+ disposable: disposable,
+ sources: this._sources
+ };
- return new CompositeDisposable(subscription, cancelable, disposableCreate(function () {
- isDisposed = true;
- }));
+ var cancelable = immediateScheduler.scheduleRecursive(state, scheduleRecursive);
+ return new NAryDisposable([subscription, disposable, cancelable]);
};
-
return ConcatObservable;
}(ObservableBase));
@@ -2893,7 +2946,7 @@ var FlatMapObservable = (function(__super__){
}(ObservableBase));
- var MergeObserver = (function () {
+ var MergeObserver = (function (__super__) {
function MergeObserver(o, max, g) {
this.o = o;
this.max = max;
@@ -2901,97 +2954,55 @@ var FlatMapObservable = (function(__super__){
this.done = false;
this.q = [];
this.activeCount = 0;
- this.isStopped = false;
+ __super__.call(this);
}
+
+ inherits(MergeObserver, __super__);
+
MergeObserver.prototype.handleSubscribe = function (xs) {
var sad = new SingleAssignmentDisposable();
this.g.add(sad);
isPromise(xs) && (xs = observableFromPromise(xs));
sad.setDisposable(xs.subscribe(new InnerObserver(this, sad)));
};
- MergeObserver.prototype.onNext = function (innerSource) {
- if (this.isStopped) { return; }
- if(this.activeCount < this.max) {
- this.activeCount++;
- this.handleSubscribe(innerSource);
- } else {
- this.q.push(innerSource);
- }
- };
- MergeObserver.prototype.onError = function (e) {
- if (!this.isStopped) {
- this.isStopped = true;
- this.o.onError(e);
- }
- };
- MergeObserver.prototype.onCompleted = function () {
- if (!this.isStopped) {
- this.isStopped = true;
- this.done = true;
- this.activeCount === 0 && this.o.onCompleted();
- }
- };
- MergeObserver.prototype.dispose = function() { this.isStopped = true; };
- MergeObserver.prototype.fail = function (e) {
- if (!this.isStopped) {
- this.isStopped = true;
- this.o.onError(e);
- return true;
- }
-
- return false;
- };
- function InnerObserver(parent, sad) {
- this.parent = parent;
- this.sad = sad;
- this.isStopped = false;
+ MergeObserver.prototype.next = function (innerSource) {
+ if(this.activeCount < this.max) {
+ this.activeCount++;
+ this.handleSubscribe(innerSource);
+ } else {
+ this.q.push(innerSource);
}
- InnerObserver.prototype.onNext = function (x) { if(!this.isStopped) { this.parent.o.onNext(x); } };
- InnerObserver.prototype.onError = function (e) {
- if (!this.isStopped) {
- this.isStopped = true;
- this.parent.o.onError(e);
- }
- };
- InnerObserver.prototype.onCompleted = function () {
- if(!this.isStopped) {
- this.isStopped = true;
- var parent = this.parent;
- parent.g.remove(this.sad);
- if (parent.q.length > 0) {
- parent.handleSubscribe(parent.q.shift());
- } else {
- parent.activeCount--;
- parent.done && parent.activeCount === 0 && parent.o.onCompleted();
- }
- }
- };
- InnerObserver.prototype.dispose = function() { this.isStopped = true; };
- InnerObserver.prototype.fail = function (e) {
- if (!this.isStopped) {
- this.isStopped = true;
- this.parent.o.onError(e);
- return true;
- }
-
- return false;
- };
-
- return MergeObserver;
- }());
+ };
+ MergeObserver.prototype.error = function (e) { this.o.onError(e); };
+ MergeObserver.prototype.completed = function () { this.done = true; this.activeCount === 0 && this.o.onCompleted(); };
+ function InnerObserver(parent, sad) {
+ this.parent = parent;
+ this.sad = sad;
+ __super__.call(this);
+ }
+ inherits(InnerObserver, __super__);
+ InnerObserver.prototype.next = function (x) { this.parent.o.onNext(x); };
+ InnerObserver.prototype.error = function (e) { this.parent.o.onError(e); };
+ InnerObserver.prototype.completed = function () {
+ this.parent.g.remove(this.sad);
+ if (this.parent.q.length > 0) {
+ this.parent.handleSubscribe(this.parent.q.shift());
+ } else {
+ this.parent.activeCount--;
+ this.parent.done && this.parent.activeCount === 0 && this.parent.o.onCompleted();
+ }
+ };
+ return MergeObserver;
+ }(AbstractObserver));
/**
* Merges an observable sequence of observable sequences into an observable sequence, limiting the number of concurrent subscriptions to inner sequences.
* Or merges two observable sequences into a single observable sequence.
- *
- * @example
- * 1 - merged = sources.merge(1);
- * 2 - merged = source.merge(otherSource);
* @param {Mixed} [maxConcurrentOrOther] Maximum number of inner observable sequences being subscribed to concurrently or the second observable sequence.
* @returns {Observable} The observable sequence that merges the elements of the inner sequences.
*/
@@ -3025,12 +3036,93 @@ var FlatMapObservable = (function(__super__){
};
var CompositeError = Rx.CompositeError = function(errors) {
- this.name = "NotImplementedError";
this.innerErrors = errors;
this.message = 'This contains multiple errors. Check the innerErrors';
Error.call(this);
- }
- CompositeError.prototype = Error.prototype;
+ };
+ CompositeError.prototype = Object.create(Error.prototype);
+ CompositeError.prototype.name = 'CompositeError';
+
+ var MergeDelayErrorObservable = (function(__super__) {
+ inherits(MergeDelayErrorObservable, __super__);
+ function MergeDelayErrorObservable(source) {
+ this.source = source;
+ __super__.call(this);
+ }
+
+ MergeDelayErrorObservable.prototype.subscribeCore = function (o) {
+ var group = new CompositeDisposable(),
+ m = new SingleAssignmentDisposable(),
+ state = { isStopped: false, errors: [], o: o };
+
+ group.add(m);
+ m.setDisposable(this.source.subscribe(new MergeDelayErrorObserver(group, state)));
+
+ return group;
+ };
+
+ return MergeDelayErrorObservable;
+ }(ObservableBase));
+
+ var MergeDelayErrorObserver = (function(__super__) {
+ inherits(MergeDelayErrorObserver, __super__);
+ function MergeDelayErrorObserver(group, state) {
+ this._group = group;
+ this._state = state;
+ __super__.call(this);
+ }
+
+ function setCompletion(o, errors) {
+ if (errors.length === 0) {
+ o.onCompleted();
+ } else if (errors.length === 1) {
+ o.onError(errors[0]);
+ } else {
+ o.onError(new CompositeError(errors));
+ }
+ }
+
+ MergeDelayErrorObserver.prototype.next = function (x) {
+ var inner = new SingleAssignmentDisposable();
+ this._group.add(inner);
+
+ // Check for promises support
+ isPromise(x) && (x = observableFromPromise(x));
+ inner.setDisposable(x.subscribe(new InnerObserver(inner, this._group, this._state)));
+ };
+
+ MergeDelayErrorObserver.prototype.error = function (e) {
+ this._state.errors.push(e);
+ this._state.isStopped = true;
+ this._group.length === 1 && setCompletion(this._state.o, this._state.errors);
+ };
+
+ MergeDelayErrorObserver.prototype.completed = function () {
+ this._state.isStopped = true;
+ this._group.length === 1 && setCompletion(this._state.o, this._state.errors);
+ };
+
+ inherits(InnerObserver, __super__);
+ function InnerObserver(inner, group, state) {
+ this._inner = inner;
+ this._group = group;
+ this._state = state;
+ __super__.call(this);
+ }
+
+ InnerObserver.prototype.next = function (x) { this._state.o.onNext(x); };
+ InnerObserver.prototype.error = function (e) {
+ this._state.errors.push(e);
+ this._group.remove(this._inner);
+ this._state.isStopped && this._group.length === 1 && setCompletion(this._state.o, this._state.errors);
+ };
+ InnerObserver.prototype.completed = function () {
+ this._group.remove(this._inner);
+ this._state.isStopped && this._group.length === 1 && setCompletion(this._state.o, this._state.errors);
+ };
+
+ return MergeDelayErrorObserver;
+ }(AbstractObserver));
/**
* Flattens an Observable that emits Observables into one Observable, in a way that allows an Observer to
@@ -3053,56 +3145,7 @@ var FlatMapObservable = (function(__super__){
for(var i = 0; i < len; i++) { args[i] = arguments[i]; }
}
var source = observableOf(null, args);
-
- return new AnonymousObservable(function (o) {
- var group = new CompositeDisposable(),
- m = new SingleAssignmentDisposable(),
- isStopped = false,
- errors = [];
-
- function setCompletion() {
- if (errors.length === 0) {
- o.onCompleted();
- } else if (errors.length === 1) {
- o.onError(errors[0]);
- } else {
- o.onError(new CompositeError(errors));
- }
- }
-
- group.add(m);
-
- m.setDisposable(source.subscribe(
- function (innerSource) {
- var innerSubscription = new SingleAssignmentDisposable();
- group.add(innerSubscription);
-
- // Check for promises support
- isPromise(innerSource) && (innerSource = observableFromPromise(innerSource));
-
- innerSubscription.setDisposable(innerSource.subscribe(
- function (x) { o.onNext(x); },
- function (e) {
- errors.push(e);
- group.remove(innerSubscription);
- isStopped && group.length === 1 && setCompletion();
- },
- function () {
- group.remove(innerSubscription);
- isStopped && group.length === 1 && setCompletion();
- }));
- },
- function (e) {
- errors.push(e);
- isStopped = true;
- group.length === 1 && setCompletion();
- },
- function () {
- isStopped = true;
- group.length === 1 && setCompletion();
- }));
- return group;
- });
+ return new MergeDelayErrorObservable(source);
};
var MergeAllObservable = (function (__super__) {
@@ -3113,85 +3156,63 @@ var FlatMapObservable = (function(__super__){
__super__.call(this);
}
- MergeAllObservable.prototype.subscribeCore = function (observer) {
+ MergeAllObservable.prototype.subscribeCore = function (o) {
var g = new CompositeDisposable(), m = new SingleAssignmentDisposable();
g.add(m);
- m.setDisposable(this.source.subscribe(new MergeAllObserver(observer, g)));
+ m.setDisposable(this.source.subscribe(new MergeAllObserver(o, g)));
return g;
};
+ return MergeAllObservable;
+ }(ObservableBase));
+
+ var MergeAllObserver = (function (__super__) {
function MergeAllObserver(o, g) {
this.o = o;
this.g = g;
- this.isStopped = false;
this.done = false;
+ __super__.call(this);
}
- MergeAllObserver.prototype.onNext = function(innerSource) {
- if(this.isStopped) { return; }
+
+ inherits(MergeAllObserver, __super__);
+
+ MergeAllObserver.prototype.next = function(innerSource) {
var sad = new SingleAssignmentDisposable();
this.g.add(sad);
-
isPromise(innerSource) && (innerSource = observableFromPromise(innerSource));
-
sad.setDisposable(innerSource.subscribe(new InnerObserver(this, sad)));
};
- MergeAllObserver.prototype.onError = function (e) {
- if(!this.isStopped) {
- this.isStopped = true;
- this.o.onError(e);
- }
- };
- MergeAllObserver.prototype.onCompleted = function () {
- if(!this.isStopped) {
- this.isStopped = true;
- this.done = true;
- this.g.length === 1 && this.o.onCompleted();
- }
+
+ MergeAllObserver.prototype.error = function (e) {
+ this.o.onError(e);
};
- MergeAllObserver.prototype.dispose = function() { this.isStopped = true; };
- MergeAllObserver.prototype.fail = function (e) {
- if (!this.isStopped) {
- this.isStopped = true;
- this.o.onError(e);
- return true;
- }
- return false;
+ MergeAllObserver.prototype.completed = function () {
+ this.done = true;
+ this.g.length === 1 && this.o.onCompleted();
};
function InnerObserver(parent, sad) {
this.parent = parent;
this.sad = sad;
- this.isStopped = false;
+ __super__.call(this);
}
- InnerObserver.prototype.onNext = function (x) { if (!this.isStopped) { this.parent.o.onNext(x); } };
- InnerObserver.prototype.onError = function (e) {
- if(!this.isStopped) {
- this.isStopped = true;
- this.parent.o.onError(e);
- }
+
+ inherits(InnerObserver, __super__);
+
+ InnerObserver.prototype.next = function (x) {
+ this.parent.o.onNext(x);
};
- InnerObserver.prototype.onCompleted = function () {
- if(!this.isStopped) {
- var parent = this.parent;
- this.isStopped = true;
- parent.g.remove(this.sad);
- parent.done && parent.g.length === 1 && parent.o.onCompleted();
- }
+ InnerObserver.prototype.error = function (e) {
+ this.parent.o.onError(e);
};
- InnerObserver.prototype.dispose = function() { this.isStopped = true; };
- InnerObserver.prototype.fail = function (e) {
- if (!this.isStopped) {
- this.isStopped = true;
- this.parent.o.onError(e);
- return true;
- }
-
- return false;
+ InnerObserver.prototype.completed = function () {
+ this.parent.g.remove(this.sad);
+ this.parent.done && this.parent.g.length === 1 && this.parent.o.onCompleted();
};
- return MergeAllObservable;
- }(ObservableBase));
+ return MergeAllObserver;
+ }(AbstractObserver));
/**
* Merges an observable sequence of observable sequences into an observable sequence.
@@ -3201,34 +3222,86 @@ var FlatMapObservable = (function(__super__){
return new MergeAllObservable(this);
};
+ var SkipUntilObservable = (function(__super__) {
+ inherits(SkipUntilObservable, __super__);
+
+ function SkipUntilObservable(source, other) {
+ this._s = source;
+ this._o = isPromise(other) ? observableFromPromise(other) : other;
+ this._open = false;
+ __super__.call(this);
+ }
+
+ SkipUntilObservable.prototype.subscribeCore = function(o) {
+ var leftSubscription = new SingleAssignmentDisposable();
+ leftSubscription.setDisposable(this._s.subscribe(new SkipUntilSourceObserver(o, this)));
+
+ isPromise(this._o) && (this._o = observableFromPromise(this._o));
+
+ var rightSubscription = new SingleAssignmentDisposable();
+ rightSubscription.setDisposable(this._o.subscribe(new SkipUntilOtherObserver(o, this, rightSubscription)));
+
+ return new BinaryDisposable(leftSubscription, rightSubscription);
+ };
+
+ return SkipUntilObservable;
+ }(ObservableBase));
+
+ var SkipUntilSourceObserver = (function(__super__) {
+ inherits(SkipUntilSourceObserver, __super__);
+ function SkipUntilSourceObserver(o, p) {
+ this._o = o;
+ this._p = p;
+ __super__.call(this);
+ }
+
+ SkipUntilSourceObserver.prototype.next = function (x) {
+ this._p._open && this._o.onNext(x);
+ };
+
+ SkipUntilSourceObserver.prototype.error = function (err) {
+ this._o.onError(err);
+ };
+
+ SkipUntilSourceObserver.prototype.onCompleted = function () {
+ this._p._open && this._o.onCompleted();
+ };
+
+ return SkipUntilSourceObserver;
+ }(AbstractObserver));
+
+ var SkipUntilOtherObserver = (function(__super__) {
+ inherits(SkipUntilOtherObserver, __super__);
+ function SkipUntilOtherObserver(o, p, r) {
+ this._o = o;
+ this._p = p;
+ this._r = r;
+ __super__.call(this);
+ }
+
+ SkipUntilOtherObserver.prototype.next = function () {
+ this._p._open = true;
+ this._r.dispose();
+ };
+
+ SkipUntilOtherObserver.prototype.error = function (err) {
+ this._o.onError(err);
+ };
+
+ SkipUntilOtherObserver.prototype.onCompleted = function () {
+ this._r.dispose();
+ };
+
+ return SkipUntilOtherObserver;
+ }(AbstractObserver));
+
/**
* Returns the values from the source observable sequence only after the other observable sequence produces a value.
* @param {Observable | Promise} other The observable sequence or Promise that triggers propagation of elements of the source sequence.
* @returns {Observable} An observable sequence containing the elements of the source sequence starting from the point the other sequence triggered propagation.
*/
observableProto.skipUntil = function (other) {
- var source = this;
- return new AnonymousObservable(function (o) {
- var isOpen = false;
- var disposables = new CompositeDisposable(source.subscribe(function (left) {
- isOpen && o.onNext(left);
- }, function (e) { o.onError(e); }, function () {
- isOpen && o.onCompleted();
- }));
-
- isPromise(other) && (other = observableFromPromise(other));
-
- var rightSubscription = new SingleAssignmentDisposable();
- disposables.add(rightSubscription);
- rightSubscription.setDisposable(other.subscribe(function () {
- isOpen = true;
- rightSubscription.dispose();
- }, function (e) { o.onError(e); }, function () {
- rightSubscription.dispose();
- }));
-
- return disposables;
- }, source);
+ return new SkipUntilObservable(this, other);
};
var SwitchObservable = (function(__super__) {
@@ -3240,80 +3313,55 @@ var FlatMapObservable = (function(__super__){
SwitchObservable.prototype.subscribeCore = function (o) {
var inner = new SerialDisposable(), s = this.source.subscribe(new SwitchObserver(o, inner));
- return new CompositeDisposable(s, inner);
+ return new BinaryDisposable(s, inner);
};
+ inherits(SwitchObserver, AbstractObserver);
function SwitchObserver(o, inner) {
this.o = o;
this.inner = inner;
this.stopped = false;
this.latest = 0;
this.hasLatest = false;
- this.isStopped = false;
+ AbstractObserver.call(this);
}
- SwitchObserver.prototype.onNext = function (innerSource) {
- if (this.isStopped) { return; }
+
+ SwitchObserver.prototype.next = function (innerSource) {
var d = new SingleAssignmentDisposable(), id = ++this.latest;
this.hasLatest = true;
this.inner.setDisposable(d);
isPromise(innerSource) && (innerSource = observableFromPromise(innerSource));
d.setDisposable(innerSource.subscribe(new InnerObserver(this, id)));
};
- SwitchObserver.prototype.onError = function (e) {
- if (!this.isStopped) {
- this.isStopped = true;
- this.o.onError(e);
- }
- };
- SwitchObserver.prototype.onCompleted = function () {
- if (!this.isStopped) {
- this.isStopped = true;
- this.stopped = true;
- !this.hasLatest && this.o.onCompleted();
- }
+
+ SwitchObserver.prototype.error = function (e) {
+ this.o.onError(e);
};
- SwitchObserver.prototype.dispose = function () { this.isStopped = true; };
- SwitchObserver.prototype.fail = function (e) {
- if(!this.isStopped) {
- this.isStopped = true;
- this.o.onError(e);
- return true;
- }
- return false;
+
+ SwitchObserver.prototype.completed = function () {
+ this.stopped = true;
+ !this.hasLatest && this.o.onCompleted();
};
+ inherits(InnerObserver, AbstractObserver);
function InnerObserver(parent, id) {
this.parent = parent;
this.id = id;
- this.isStopped = false;
+ AbstractObserver.call(this);
}
- InnerObserver.prototype.onNext = function (x) {
- if (this.isStopped) { return; }
+ InnerObserver.prototype.next = function (x) {
this.parent.latest === this.id && this.parent.o.onNext(x);
};
- InnerObserver.prototype.onError = function (e) {
- if (!this.isStopped) {
- this.isStopped = true;
- this.parent.latest === this.id && this.parent.o.onError(e);
- }
- };
- InnerObserver.prototype.onCompleted = function () {
- if (!this.isStopped) {
- this.isStopped = true;
- if (this.parent.latest === this.id) {
- this.parent.hasLatest = false;
- this.parent.isStopped && this.parent.o.onCompleted();
- }
- }
+
+ InnerObserver.prototype.error = function (e) {
+ this.parent.latest === this.id && this.parent.o.onError(e);
};
- InnerObserver.prototype.dispose = function () { this.isStopped = true; }
- InnerObserver.prototype.fail = function (e) {
- if(!this.isStopped) {
- this.isStopped = true;
- this.parent.o.onError(e);
- return true;
+
+ InnerObserver.prototype.completed = function () {
+ if (this.parent.latest === this.id) {
+ this.parent.hasLatest = false;
+ this.parent.stopped && this.parent.o.onCompleted();
}
- return false;
};
return SwitchObservable;
@@ -3337,41 +3385,34 @@ var FlatMapObservable = (function(__super__){
}
TakeUntilObservable.prototype.subscribeCore = function(o) {
- return new CompositeDisposable(
+ return new BinaryDisposable(
this.source.subscribe(o),
- this.other.subscribe(new InnerObserver(o))
+ this.other.subscribe(new TakeUntilObserver(o))
);
};
- function InnerObserver(o) {
- this.o = o;
- this.isStopped = false;
+ return TakeUntilObservable;
+ }(ObservableBase));
+
+ var TakeUntilObserver = (function(__super__) {
+ inherits(TakeUntilObserver, __super__);
+ function TakeUntilObserver(o) {
+ this._o = o;
+ __super__.call(this);
}
- InnerObserver.prototype.onNext = function (x) {
- if (this.isStopped) { return; }
- this.o.onCompleted();
- };
- InnerObserver.prototype.onError = function (err) {
- if (!this.isStopped) {
- this.isStopped = true;
- this.o.onError(err);
- }
- };
- InnerObserver.prototype.onCompleted = function () {
- !this.isStopped && (this.isStopped = true);
+
+ TakeUntilObserver.prototype.next = function () {
+ this._o.onCompleted();
};
- InnerObserver.prototype.dispose = function() { this.isStopped = true; };
- InnerObserver.prototype.fail = function (e) {
- if (!this.isStopped) {
- this.isStopped = true;
- this.o.onError(e);
- return true;
- }
- return false;
+
+ TakeUntilObserver.prototype.error = function (err) {
+ this._o.onError(err);
};
- return TakeUntilObservable;
- }(ObservableBase));
+ TakeUntilObserver.prototype.onCompleted = noop;
+
+ return TakeUntilObserver;
+ }(AbstractObserver));
/**
* Returns the values from the source observable sequence until the other observable sequence produces a value.
@@ -3383,60 +3424,186 @@ var FlatMapObservable = (function(__super__){
};
function falseFactory() { return false; }
+ function argumentsToArray() {
+ var len = arguments.length, args = new Array(len);
+ for(var i = 0; i < len; i++) { args[i] = arguments[i]; }
+ return args;
+ }
+
+ var WithLatestFromObservable = (function(__super__) {
+ inherits(WithLatestFromObservable, __super__);
+ function WithLatestFromObservable(source, sources, resultSelector) {
+ this._s = source;
+ this._ss = sources;
+ this._cb = resultSelector;
+ __super__.call(this);
+ }
+
+ WithLatestFromObservable.prototype.subscribeCore = function (o) {
+ var len = this._ss.length;
+ var state = {
+ hasValue: arrayInitialize(len, falseFactory),
+ hasValueAll: false,
+ values: new Array(len)
+ };
+
+ var n = this._ss.length, subscriptions = new Array(n + 1);
+ for (var i = 0; i < n; i++) {
+ var other = this._ss[i], sad = new SingleAssignmentDisposable();
+ isPromise(other) && (other = observableFromPromise(other));
+ sad.setDisposable(other.subscribe(new WithLatestFromOtherObserver(o, i, state)));
+ subscriptions[i] = sad;
+ }
+
+ var outerSad = new SingleAssignmentDisposable();
+ outerSad.setDisposable(this._s.subscribe(new WithLatestFromSourceObserver(o, this._cb, state)));
+ subscriptions[n] = outerSad;
+
+ return new NAryDisposable(subscriptions);
+ };
+
+ return WithLatestFromObservable;
+ }(ObservableBase));
+
+ var WithLatestFromOtherObserver = (function (__super__) {
+ inherits(WithLatestFromOtherObserver, __super__);
+ function WithLatestFromOtherObserver(o, i, state) {
+ this._o = o;
+ this._i = i;
+ this._state = state;
+ __super__.call(this);
+ }
+
+ WithLatestFromOtherObserver.prototype.next = function (x) {
+ this._state.values[this._i] = x;
+ this._state.hasValue[this._i] = true;
+ this._state.hasValueAll = this._state.hasValue.every(identity);
+ };
+
+ WithLatestFromOtherObserver.prototype.error = function (e) {
+ this._o.onError(e);
+ };
+
+ WithLatestFromOtherObserver.prototype.completed = noop;
+
+ return WithLatestFromOtherObserver;
+ }(AbstractObserver));
+
+ var WithLatestFromSourceObserver = (function (__super__) {
+ inherits(WithLatestFromSourceObserver, __super__);
+ function WithLatestFromSourceObserver(o, cb, state) {
+ this._o = o;
+ this._cb = cb;
+ this._state = state;
+ __super__.call(this);
+ }
+
+ WithLatestFromSourceObserver.prototype.next = function (x) {
+ var allValues = [x].concat(this._state.values);
+ if (!this._state.hasValueAll) { return; }
+ var res = tryCatch(this._cb).apply(null, allValues);
+ if (res === errorObj) { return this._o.onError(res.e); }
+ this._o.onNext(res);
+ };
+
+ WithLatestFromSourceObserver.prototype.error = function (e) {
+ this._o.onError(e);
+ };
+
+ WithLatestFromSourceObserver.prototype.completed = function () {
+ this._o.onCompleted();
+ };
+
+ return WithLatestFromSourceObserver;
+ }(AbstractObserver));
/**
* Merges the specified observable sequences into one observable sequence by using the selector function only when the (first) source observable sequence produces an element.
* @returns {Observable} An observable sequence containing the result of combining elements of the sources using the specified result selector function.
*/
observableProto.withLatestFrom = function () {
- var len = arguments.length, args = new Array(len)
+ if (arguments.length === 0) { throw new Error('invalid arguments'); }
+
+ var len = arguments.length, args = new Array(len);
for(var i = 0; i < len; i++) { args[i] = arguments[i]; }
- var resultSelector = args.pop(), source = this;
+ var resultSelector = isFunction(args[len - 1]) ? args.pop() : argumentsToArray;
Array.isArray(args[0]) && (args = args[0]);
- return new AnonymousObservable(function (observer) {
- var n = args.length,
- hasValue = arrayInitialize(n, falseFactory),
- hasValueAll = false,
- values = new Array(n);
-
- var subscriptions = new Array(n + 1);
- for (var idx = 0; idx < n; idx++) {
- (function (i) {
- var other = args[i], sad = new SingleAssignmentDisposable();
- isPromise(other) && (other = observableFromPromise(other));
- sad.setDisposable(other.subscribe(function (x) {
- values[i] = x;
- hasValue[i] = true;
- hasValueAll = hasValue.every(identity);
- }, function (e) { observer.onError(e); }, noop));
- subscriptions[i] = sad;
- }(idx));
- }
-
- var sad = new SingleAssignmentDisposable();
- sad.setDisposable(source.subscribe(function (x) {
- var allValues = [x].concat(values);
- if (!hasValueAll) { return; }
- var res = tryCatch(resultSelector).apply(null, allValues);
- if (res === errorObj) { return observer.onError(res.e); }
- observer.onNext(res);
- }, function (e) { observer.onError(e); }, function () {
- observer.onCompleted();
- }));
- subscriptions[n] = sad;
-
- return new CompositeDisposable(subscriptions);
- }, this);
+ return new WithLatestFromObservable(this, args, resultSelector);
};
function falseFactory() { return false; }
function emptyArrayFactory() { return []; }
- function argumentsToArray() {
- var len = arguments.length, args = new Array(len);
- for(var i = 0; i < len; i++) { args[i] = arguments[i]; }
- return args;
- }
+
+ var ZipObservable = (function(__super__) {
+ inherits(ZipObservable, __super__);
+ function ZipObservable(sources, resultSelector) {
+ this._s = sources;
+ this._cb = resultSelector;
+ __super__.call(this);
+ }
+
+ ZipObservable.prototype.subscribeCore = function(observer) {
+ var n = this._s.length,
+ subscriptions = new Array(n),
+ done = arrayInitialize(n, falseFactory),
+ q = arrayInitialize(n, emptyArrayFactory);
+
+ for (var i = 0; i < n; i++) {
+ var source = this._s[i], sad = new SingleAssignmentDisposable();
+ subscriptions[i] = sad;
+ isPromise(source) && (source = observableFromPromise(source));
+ sad.setDisposable(source.subscribe(new ZipObserver(observer, i, this, q, done)));
+ }
+
+ return new NAryDisposable(subscriptions);
+ };
+
+ return ZipObservable;
+ }(ObservableBase));
+
+ var ZipObserver = (function (__super__) {
+ inherits(ZipObserver, __super__);
+ function ZipObserver(o, i, p, q, d) {
+ this._o = o;
+ this._i = i;
+ this._p = p;
+ this._q = q;
+ this._d = d;
+ __super__.call(this);
+ }
+
+ function notEmpty(x) { return x.length > 0; }
+ function shiftEach(x) { return x.shift(); }
+ function notTheSame(i) {
+ return function (x, j) {
+ return j !== i;
+ };
+ }
+
+ ZipObserver.prototype.next = function (x) {
+ this._q[this._i].push(x);
+ if (this._q.every(notEmpty)) {
+ var queuedValues = this._q.map(shiftEach);
+ var res = tryCatch(this._p._cb).apply(null, queuedValues);
+ if (res === errorObj) { return this._o.onError(res.e); }
+ this._o.onNext(res);
+ } else if (this._d.filter(notTheSame(this._i)).every(identity)) {
+ this._o.onCompleted();
+ }
+ };
+
+ ZipObserver.prototype.error = function (e) {
+ this._o.onError(e);
+ };
+
+ ZipObserver.prototype.completed = function () {
+ this._d[this._i] = true;
+ this._d.every(identity) && this._o.onCompleted();
+ };
+
+ return ZipObserver;
+ }(AbstractObserver));
/**
* Merges the specified observable sequences into one observable sequence by using the selector function whenever all of the observable sequences or an array have produced an element at a corresponding index.
@@ -3453,38 +3620,8 @@ var FlatMapObservable = (function(__super__){
var parent = this;
args.unshift(parent);
- return new AnonymousObservable(function (o) {
- var n = args.length,
- queues = arrayInitialize(n, emptyArrayFactory),
- isDone = arrayInitialize(n, falseFactory);
-
- var subscriptions = new Array(n);
- for (var idx = 0; idx < n; idx++) {
- (function (i) {
- var source = args[i], sad = new SingleAssignmentDisposable();
-
- isPromise(source) && (source = observableFromPromise(source));
-
- sad.setDisposable(source.subscribe(function (x) {
- queues[i].push(x);
- if (queues.every(function (x) { return x.length > 0; })) {
- var queuedValues = queues.map(function (x) { return x.shift(); }),
- res = tryCatch(resultSelector).apply(parent, queuedValues);
- if (res === errorObj) { return o.onError(res.e); }
- o.onNext(res);
- } else if (isDone.filter(function (x, j) { return j !== i; }).every(identity)) {
- o.onCompleted();
- }
- }, function (e) { o.onError(e); }, function () {
- isDone[i] = true;
- isDone.every(identity) && o.onCompleted();
- }));
- subscriptions[i] = sad;
- })(idx);
- }
- return new CompositeDisposable(subscriptions);
- }, parent);
+ return new ZipObservable(args, resultSelector);
};
/**
@@ -3511,6 +3648,78 @@ function argumentsToArray() {
return args;
}
+var ZipIterableObservable = (function(__super__) {
+ inherits(ZipIterableObservable, __super__);
+ function ZipIterableObservable(sources, cb) {
+ this.sources = sources;
+ this._cb = cb;
+ __super__.call(this);
+ }
+
+ ZipIterableObservable.prototype.subscribeCore = function (o) {
+ var sources = this.sources, len = sources.length, subscriptions = new Array(len);
+
+ var state = {
+ q: arrayInitialize(len, emptyArrayFactory),
+ done: arrayInitialize(len, falseFactory),
+ cb: this._cb,
+ o: o
+ };
+
+ for (var i = 0; i < len; i++) {
+ (function (i) {
+ var source = sources[i], sad = new SingleAssignmentDisposable();
+ (isArrayLike(source) || isIterable(source)) && (source = observableFrom(source));
+
+ subscriptions[i] = sad;
+ sad.setDisposable(source.subscribe(new ZipIterableObserver(state, i)));
+ }(i));
+ }
+
+ return new NAryDisposable(subscriptions);
+ };
+
+ return ZipIterableObservable;
+}(ObservableBase));
+
+var ZipIterableObserver = (function (__super__) {
+ inherits(ZipIterableObserver, __super__);
+ function ZipIterableObserver(s, i) {
+ this._s = s;
+ this._i = i;
+ __super__.call(this);
+ }
+
+ function notEmpty(x) { return x.length > 0; }
+ function shiftEach(x) { return x.shift(); }
+ function notTheSame(i) {
+ return function (x, j) {
+ return j !== i;
+ };
+ }
+
+ ZipIterableObserver.prototype.next = function (x) {
+ this._s.q[this._i].push(x);
+ if (this._s.q.every(notEmpty)) {
+ var queuedValues = this._s.q.map(shiftEach),
+ res = tryCatch(this._s.cb).apply(null, queuedValues);
+ if (res === errorObj) { return this._s.o.onError(res.e); }
+ this._s.o.onNext(res);
+ } else if (this._s.done.filter(notTheSame(this._i)).every(identity)) {
+ this._s.o.onCompleted();
+ }
+ };
+
+ ZipIterableObserver.prototype.error = function (e) { this._s.o.onError(e); };
+
+ ZipIterableObserver.prototype.completed = function () {
+ this._s.done[this._i] = true;
+ this._s.done.every(identity) && this._s.o.onCompleted();
+ };
+
+ return ZipIterableObserver;
+}(AbstractObserver));
+
/**
* Merges the specified observable sequences into one observable sequence by using the selector function whenever all of the observable sequences or an array have produced an element at a corresponding index.
* The last element in the arguments must be a function to invoke for each series of elements at corresponding indexes in the args.
@@ -3525,38 +3734,7 @@ observableProto.zipIterable = function () {
var parent = this;
args.unshift(parent);
- return new AnonymousObservable(function (o) {
- var n = args.length,
- queues = arrayInitialize(n, emptyArrayFactory),
- isDone = arrayInitialize(n, falseFactory);
-
- var subscriptions = new Array(n);
- for (var idx = 0; idx < n; idx++) {
- (function (i) {
- var source = args[i], sad = new SingleAssignmentDisposable();
-
- (isArrayLike(source) || isIterable(source)) && (source = observableFrom(source));
-
- sad.setDisposable(source.subscribe(function (x) {
- queues[i].push(x);
- if (queues.every(function (x) { return x.length > 0; })) {
- var queuedValues = queues.map(function (x) { return x.shift(); }),
- res = tryCatch(resultSelector).apply(parent, queuedValues);
- if (res === errorObj) { return o.onError(res.e); }
- o.onNext(res);
- } else if (isDone.filter(function (x, j) { return j !== i; }).every(identity)) {
- o.onCompleted();
- }
- }, function (e) { o.onError(e); }, function () {
- isDone[i] = true;
- isDone.every(identity) && o.onCompleted();
- }));
- subscriptions[i] = sad;
- })(idx);
- }
-
- return new CompositeDisposable(subscriptions);
- }, parent);
+ return new ZipIterableObservable(args, resultSelector);
};
function asObservable(source) {
@@ -3571,15 +3749,41 @@ observableProto.zipIterable = function () {
return new AnonymousObservable(asObservable(this), this);
};
+ var DematerializeObservable = (function (__super__) {
+ inherits(DematerializeObservable, __super__);
+ function DematerializeObservable(source) {
+ this.source = source;
+ __super__.call(this);
+ }
+
+ DematerializeObservable.prototype.subscribeCore = function (o) {
+ return this.source.subscribe(new DematerializeObserver(o));
+ };
+
+ return DematerializeObservable;
+ }(ObservableBase));
+
+ var DematerializeObserver = (function (__super__) {
+ inherits(DematerializeObserver, __super__);
+
+ function DematerializeObserver(o) {
+ this._o = o;
+ __super__.call(this);
+ }
+
+ DematerializeObserver.prototype.next = function (x) { x.accept(this._o); };
+ DematerializeObserver.prototype.error = function (e) { this._o.onError(e); };
+ DematerializeObserver.prototype.completed = function () { this._o.onCompleted(); };
+
+ return DematerializeObserver;
+ }(AbstractObserver));
+
/**
* Dematerializes the explicit notification values of an observable sequence as implicit notifications.
* @returns {Observable} An observable sequence exhibiting the behavior corresponding to the source sequence's notification values.
*/
observableProto.dematerialize = function () {
- var source = this;
- return new AnonymousObservable(function (o) {
- return source.subscribe(function (x) { return x.accept(o); }, function(e) { o.onError(e); }, function () { o.onCompleted(); });
- }, this);
+ return new DematerializeObservable(this);
};
var DistinctUntilChangedObservable = (function(__super__) {
@@ -3660,43 +3864,29 @@ observableProto.zipIterable = function () {
return this.source.subscribe(new InnerObserver(o, this));
};
+ inherits(InnerObserver, AbstractObserver);
function InnerObserver(o, p) {
this.o = o;
this.t = !p._oN || isFunction(p._oN) ?
observerCreate(p._oN || noop, p._oE || noop, p._oC || noop) :
p._oN;
this.isStopped = false;
+ AbstractObserver.call(this);
}
- InnerObserver.prototype.onNext = function(x) {
- if (this.isStopped) { return; }
+ InnerObserver.prototype.next = function(x) {
var res = tryCatch(this.t.onNext).call(this.t, x);
if (res === errorObj) { this.o.onError(res.e); }
this.o.onNext(x);
};
- InnerObserver.prototype.onError = function(err) {
- if (!this.isStopped) {
- this.isStopped = true;
- var res = tryCatch(this.t.onError).call(this.t, err);
- if (res === errorObj) { return this.o.onError(res.e); }
- this.o.onError(err);
- }
- };
- InnerObserver.prototype.onCompleted = function() {
- if (!this.isStopped) {
- this.isStopped = true;
- var res = tryCatch(this.t.onCompleted).call(this.t);
- if (res === errorObj) { return this.o.onError(res.e); }
- this.o.onCompleted();
- }
+ InnerObserver.prototype.error = function(err) {
+ var res = tryCatch(this.t.onError).call(this.t, err);
+ if (res === errorObj) { return this.o.onError(res.e); }
+ this.o.onError(err);
};
- InnerObserver.prototype.dispose = function() { this.isStopped = true; };
- InnerObserver.prototype.fail = function (e) {
- if (!this.isStopped) {
- this.isStopped = true;
- this.o.onError(e);
- return true;
- }
- return false;
+ InnerObserver.prototype.completed = function() {
+ var res = tryCatch(this.t.onCompleted).call(this.t);
+ if (res === errorObj) { return this.o.onError(res.e); }
+ this.o.onCompleted();
};
return TapObservable;
@@ -3747,25 +3937,48 @@ observableProto.zipIterable = function () {
return this.tap(noop, null, typeof thisArg !== 'undefined' ? function () { onCompleted.call(thisArg); } : onCompleted);
};
+ var FinallyObservable = (function (__super__) {
+ inherits(FinallyObservable, __super__);
+ function FinallyObservable(source, fn, thisArg) {
+ this.source = source;
+ this._fn = bindCallback(fn, thisArg, 0);
+ __super__.call(this);
+ }
+
+ FinallyObservable.prototype.subscribeCore = function (o) {
+ var d = tryCatch(this.source.subscribe).call(this.source, o);
+ if (d === errorObj) {
+ this._fn();
+ thrower(d.e);
+ }
+
+ return new FinallyDisposable(d, this._fn);
+ };
+
+ function FinallyDisposable(s, fn) {
+ this.isDisposed = false;
+ this._s = s;
+ this._fn = fn;
+ }
+ FinallyDisposable.prototype.dispose = function () {
+ if (!this.isDisposed) {
+ var res = tryCatch(this._s.dispose).call(this._s);
+ this._fn();
+ res === errorObj && thrower(res.e);
+ }
+ };
+
+ return FinallyObservable;
+
+ }(ObservableBase));
+
/**
* Invokes a specified action after the source observable sequence terminates gracefully or exceptionally.
* @param {Function} finallyAction Action to invoke after the source observable sequence terminates.
* @returns {Observable} Source sequence with the action-invoking termination behavior applied.
*/
- observableProto['finally'] = function (action) {
- var source = this;
- return new AnonymousObservable(function (observer) {
- var subscription = tryCatch(source.subscribe).call(source, observer);
- if (subscription === errorObj) {
- action();
- return thrower(subscription.e);
- }
- return disposableCreate(function () {
- var r = tryCatch(subscription.dispose).call(subscription);
- action();
- r === errorObj && thrower(r.e);
- });
- }, this);
+ observableProto['finally'] = function (action, thisArg) {
+ return new FinallyObservable(this, action, thisArg);
};
var IgnoreElementsObservable = (function(__super__) {
@@ -3819,23 +4032,41 @@ observableProto.zipIterable = function () {
return new IgnoreElementsObservable(this);
};
+ var MaterializeObservable = (function (__super__) {
+ inherits(MaterializeObservable, __super__);
+ function MaterializeObservable(source, fn) {
+ this.source = source;
+ __super__.call(this);
+ }
+
+ MaterializeObservable.prototype.subscribeCore = function (o) {
+ return this.source.subscribe(new MaterializeObserver(o));
+ };
+
+ return MaterializeObservable;
+ }(ObservableBase));
+
+ var MaterializeObserver = (function (__super__) {
+ inherits(MaterializeObserver, __super__);
+
+ function MaterializeObserver(o) {
+ this._o = o;
+ __super__.call(this);
+ }
+
+ MaterializeObserver.prototype.next = function (x) { this._o.onNext(notificationCreateOnNext(x)) };
+ MaterializeObserver.prototype.error = function (e) { this._o.onNext(notificationCreateOnError(e)); this._o.onCompleted(); };
+ MaterializeObserver.prototype.completed = function () { this._o.onNext(notificationCreateOnCompleted()); this._o.onCompleted(); };
+
+ return MaterializeObserver;
+ }(AbstractObserver));
+
/**
* Materializes the implicit notifications of an observable sequence as explicit notification values.
* @returns {Observable} An observable sequence containing the materialized notification values from the source sequence.
*/
observableProto.materialize = function () {
- var source = this;
- return new AnonymousObservable(function (observer) {
- return source.subscribe(function (value) {
- observer.onNext(notificationCreateOnNext(value));
- }, function (e) {
- observer.onNext(notificationCreateOnError(e));
- observer.onCompleted();
- }, function () {
- observer.onNext(notificationCreateOnCompleted());
- observer.onCompleted();
- });
- }, source);
+ return new MaterializeObservable(this);
};
/**
@@ -3861,19 +4092,184 @@ observableProto.zipIterable = function () {
return enumerableRepeat(this, retryCount).catchError();
};
- /**
- * Repeats the source observable sequence upon error each time the notifier emits or until it successfully terminates.
- * if the notifier completes, the observable sequence completes.
- *
- * @example
- * var timer = Observable.timer(500);
- * var source = observable.retryWhen(timer);
- * @param {Observable} [notifier] An observable that triggers the retries or completes the observable with onNext or onCompleted respectively.
- * @returns {Observable} An observable sequence producing the elements of the given sequence repeatedly until it terminates successfully.
- */
+ function repeat(value) {
+ return {
+ '@@iterator': function () {
+ return {
+ next: function () {
+ return { done: false, value: value };
+ }
+ };
+ }
+ };
+ }
+
+ var RetryWhenObservable = (function(__super__) {
+ function createDisposable(state) {
+ return {
+ isDisposed: false,
+ dispose: function () {
+ if (!this.isDisposed) {
+ this.isDisposed = true;
+ state.isDisposed = true;
+ }
+ }
+ };
+ }
+
+ function RetryWhenObservable(source, notifier) {
+ this.source = source;
+ this._notifier = notifier;
+ __super__.call(this);
+ }
+
+ inherits(RetryWhenObservable, __super__);
+
+ RetryWhenObservable.prototype.subscribeCore = function (o) {
+ var exceptions = new Subject(),
+ notifier = new Subject(),
+ handled = this._notifier(exceptions),
+ notificationDisposable = handled.subscribe(notifier);
+
+ var e = this.source['@@iterator']();
+
+ var state = { isDisposed: false },
+ lastError,
+ subscription = new SerialDisposable();
+ var cancelable = currentThreadScheduler.scheduleRecursive(null, function (_, recurse) {
+ if (state.isDisposed) { return; }
+ var currentItem = e.next();
+
+ if (currentItem.done) {
+ if (lastError) {
+ o.onError(lastError);
+ } else {
+ o.onCompleted();
+ }
+ return;
+ }
+
+ // Check if promise
+ var currentValue = currentItem.value;
+ isPromise(currentValue) && (currentValue = observableFromPromise(currentValue));
+
+ var outer = new SingleAssignmentDisposable();
+ var inner = new SingleAssignmentDisposable();
+ subscription.setDisposable(new BinaryDisposable(inner, outer));
+ outer.setDisposable(currentValue.subscribe(
+ function(x) { o.onNext(x); },
+ function (exn) {
+ inner.setDisposable(notifier.subscribe(recurse, function(ex) {
+ o.onError(ex);
+ }, function() {
+ o.onCompleted();
+ }));
+
+ exceptions.onNext(exn);
+ outer.dispose();
+ },
+ function() { o.onCompleted(); }));
+ });
+
+ return new NAryDisposable([notificationDisposable, subscription, cancelable, createDisposable(state)]);
+ };
+
+ return RetryWhenObservable;
+ }(ObservableBase));
+
observableProto.retryWhen = function (notifier) {
- return enumerableRepeat(this).catchErrorWhen(notifier);
+ return new RetryWhenObservable(repeat(this), notifier);
+ };
+
+ function repeat(value) {
+ return {
+ '@@iterator': function () {
+ return {
+ next: function () {
+ return { done: false, value: value };
+ }
+ };
+ }
+ };
+ }
+
+ var RepeatWhenObservable = (function(__super__) {
+ function createDisposable(state) {
+ return {
+ isDisposed: false,
+ dispose: function () {
+ if (!this.isDisposed) {
+ this.isDisposed = true;
+ state.isDisposed = true;
+ }
+ }
+ };
+ }
+
+ function RepeatWhenObservable(source, notifier) {
+ this.source = source;
+ this._notifier = notifier;
+ __super__.call(this);
+ }
+
+ inherits(RepeatWhenObservable, __super__);
+
+ RepeatWhenObservable.prototype.subscribeCore = function (o) {
+ var completions = new Subject(),
+ notifier = new Subject(),
+ handled = this._notifier(completions),
+ notificationDisposable = handled.subscribe(notifier);
+
+ var e = this.source['@@iterator']();
+
+ var state = { isDisposed: false },
+ lastError,
+ subscription = new SerialDisposable();
+ var cancelable = currentThreadScheduler.scheduleRecursive(null, function (_, recurse) {
+ if (state.isDisposed) { return; }
+ var currentItem = e.next();
+
+ if (currentItem.done) {
+ if (lastError) {
+ o.onError(lastError);
+ } else {
+ o.onCompleted();
+ }
+ return;
+ }
+
+ // Check if promise
+ var currentValue = currentItem.value;
+ isPromise(currentValue) && (currentValue = observableFromPromise(currentValue));
+
+ var outer = new SingleAssignmentDisposable();
+ var inner = new SingleAssignmentDisposable();
+ subscription.setDisposable(new BinaryDisposable(inner, outer));
+ outer.setDisposable(currentValue.subscribe(
+ function(x) { o.onNext(x); },
+ function (exn) { o.onError(exn); },
+ function() {
+ inner.setDisposable(notifier.subscribe(recurse, function(ex) {
+ o.onError(ex);
+ }, function() {
+ o.onCompleted();
+ }));
+
+ completions.onNext(null);
+ outer.dispose();
+ }));
+ });
+
+ return new NAryDisposable([notificationDisposable, subscription, cancelable, createDisposable(state)]);
+ };
+
+ return RepeatWhenObservable;
+ }(ObservableBase));
+
+ observableProto.repeatWhen = function (notifier) {
+ return new RepeatWhenObservable(repeat(this), notifier);
};
+
var ScanObservable = (function(__super__) {
inherits(ScanObservable, __super__);
function ScanObservable(source, accumulator, hasSeed, seed) {
@@ -3885,58 +4281,51 @@ observableProto.zipIterable = function () {
}
ScanObservable.prototype.subscribeCore = function(o) {
- return this.source.subscribe(new InnerObserver(o,this));
+ return this.source.subscribe(new ScanObserver(o,this));
};
return ScanObservable;
}(ObservableBase));
- function InnerObserver(o, parent) {
- this.o = o;
- this.accumulator = parent.accumulator;
- this.hasSeed = parent.hasSeed;
- this.seed = parent.seed;
- this.hasAccumulation = false;
- this.accumulation = null;
- this.hasValue = false;
- this.isStopped = false;
- }
- InnerObserver.prototype = {
- onNext: function (x) {
- if (this.isStopped) { return; }
- !this.hasValue && (this.hasValue = true);
- if (this.hasAccumulation) {
- this.accumulation = tryCatch(this.accumulator)(this.accumulation, x);
+ var ScanObserver = (function (__super__) {
+ inherits(ScanObserver, __super__);
+ function ScanObserver(o, parent) {
+ this._o = o;
+ this._p = parent;
+ this._fn = parent.accumulator;
+ this._hs = parent.hasSeed;
+ this._s = parent.seed;
+ this._ha = false;
+ this._a = null;
+ this._hv = false;
+ this._i = 0;
+ __super__.call(this);
+ }
+
+ ScanObserver.prototype.next = function (x) {
+ !this._hv && (this._hv = true);
+ if (this._ha) {
+ this._a = tryCatch(this._fn)(this._a, x, this._i, this._p);
} else {
- this.accumulation = this.hasSeed ? tryCatch(this.accumulator)(this.seed, x) : x;
- this.hasAccumulation = true;
- }
- if (this.accumulation === errorObj) { return this.o.onError(this.accumulation.e); }
- this.o.onNext(this.accumulation);
- },
- onError: function (e) {
- if (!this.isStopped) {
- this.isStopped = true;
- this.o.onError(e);
+ this._a = this._hs ? tryCatch(this._fn)(this._s, x, this._i, this._p) : x;
+ this._ha = true;
}
- },
- onCompleted: function () {
- if (!this.isStopped) {
- this.isStopped = true;
- !this.hasValue && this.hasSeed && this.o.onNext(this.seed);
- this.o.onCompleted();
- }
- },
- dispose: function() { this.isStopped = true; },
- fail: function (e) {
- if (!this.isStopped) {
- this.isStopped = true;
- this.o.onError(e);
- return true;
- }
- return false;
- }
- };
+ if (this._a === errorObj) { return this._o.onError(this._a.e); }
+ this._o.onNext(this._a);
+ this._i++;
+ };
+
+ ScanObserver.prototype.error = function (e) {
+ this._o.onError(e);
+ };
+
+ ScanObserver.prototype.completed = function () {
+ !this._hv && this._hs && this._o.onNext(this._s);
+ this._o.onCompleted();
+ };
+
+ return ScanObserver;
+ }(AbstractObserver));
/**
* Applies an accumulator function over an observable sequence and returns each intermediate result. The optional seed value is used as the initial accumulator value.
@@ -3954,6 +4343,46 @@ observableProto.zipIterable = function () {
return new ScanObservable(this, accumulator, hasSeed, seed);
};
+ var SkipLastObservable = (function (__super__) {
+ inherits(SkipLastObservable, __super__);
+ function SkipLastObservable(source, c) {
+ this.source = source;
+ this._c = c;
+ __super__.call(this);
+ }
+
+ SkipLastObservable.prototype.subscribeCore = function (o) {
+ return this.source.subscribe(new SkipLastObserver(o, this._c));
+ };
+
+ return SkipLastObservable;
+ }(ObservableBase));
+
+ var SkipLastObserver = (function (__super__) {
+ inherits(SkipLastObserver, __super__);
+ function SkipLastObserver(o, c) {
+ this._o = o;
+ this._c = c;
+ this._q = [];
+ __super__.call(this);
+ }
+
+ SkipLastObserver.prototype.next = function (x) {
+ this._q.push(x);
+ this._q.length > this._c && this._o.onNext(this._q.shift());
+ };
+
+ SkipLastObserver.prototype.error = function (e) {
+ this._o.onError(e);
+ };
+
+ SkipLastObserver.prototype.completed = function () {
+ this._o.onCompleted();
+ };
+
+ return SkipLastObserver;
+ }(AbstractObserver));
+
/**
* Bypasses a specified number of elements at the end of an observable sequence.
* @description
@@ -3964,14 +4393,7 @@ observableProto.zipIterable = function () {
*/
observableProto.skipLast = function (count) {
if (count < 0) { throw new ArgumentOutOfRangeError(); }
- var source = this;
- return new AnonymousObservable(function (o) {
- var q = [];
- return source.subscribe(function (x) {
- q.push(x);
- q.length > count && o.onNext(q.shift());
- }, function (e) { o.onError(e); }, function () { o.onCompleted(); });
- }, source);
+ return new SkipLastObservable(this, count);
};
/**
@@ -3994,6 +4416,32 @@ observableProto.zipIterable = function () {
return enumerableOf([observableFromArray(args, scheduler), this]).concat();
};
+ var TakeLastObserver = (function (__super__) {
+ inherits(TakeLastObserver, __super__);
+ function TakeLastObserver(o, c) {
+ this._o = o;
+ this._c = c;
+ this._q = [];
+ __super__.call(this);
+ }
+
+ TakeLastObserver.prototype.next = function (x) {
+ this._q.push(x);
+ this._q.length > this._c && this._q.shift();
+ };
+
+ TakeLastObserver.prototype.error = function (e) {
+ this._o.onError(e);
+ };
+
+ TakeLastObserver.prototype.completed = function () {
+ while (this._q.length > 0) { this._o.onNext(this._q.shift()); }
+ this._o.onCompleted();
+ };
+
+ return TakeLastObserver;
+ }(AbstractObserver));
+
/**
* Returns a specified number of contiguous elements from the end of an observable sequence.
* @description
@@ -4006,14 +4454,7 @@ observableProto.zipIterable = function () {
if (count < 0) { throw new ArgumentOutOfRangeError(); }
var source = this;
return new AnonymousObservable(function (o) {
- var q = [];
- return source.subscribe(function (x) {
- q.push(x);
- q.length > count && q.shift();
- }, function (e) { o.onError(e); }, function () {
- while (q.length > 0) { o.onNext(q.shift()); }
- o.onCompleted();
- });
+ return source.subscribe(new TakeLastObserver(o, count));
}, source);
};
@@ -4030,7 +4471,7 @@ observableProto.flatMapConcat = observableProto.concatMap = function(selector, r
}
function innerMap(selector, self) {
- return function (x, i, o) { return selector.call(this, self.selector(x, i, o), i, o); }
+ return function (x, i, o) { return selector.call(this, self.selector(x, i, o), i, o); };
}
MapObservable.prototype.internalMap = function (selector, thisArg) {
@@ -4041,35 +4482,27 @@ observableProto.flatMapConcat = observableProto.concatMap = function(selector, r
return this.source.subscribe(new InnerObserver(o, this.selector, this));
};
+ inherits(InnerObserver, AbstractObserver);
function InnerObserver(o, selector, source) {
this.o = o;
this.selector = selector;
this.source = source;
this.i = 0;
- this.isStopped = false;
+ AbstractObserver.call(this);
}
- InnerObserver.prototype.onNext = function(x) {
- if (this.isStopped) { return; }
+ InnerObserver.prototype.next = function(x) {
var result = tryCatch(this.selector)(x, this.i++, this.source);
if (result === errorObj) { return this.o.onError(result.e); }
this.o.onNext(result);
};
- InnerObserver.prototype.onError = function (e) {
- if(!this.isStopped) { this.isStopped = true; this.o.onError(e); }
- };
- InnerObserver.prototype.onCompleted = function () {
- if(!this.isStopped) { this.isStopped = true; this.o.onCompleted(); }
+
+ InnerObserver.prototype.error = function (e) {
+ this.o.onError(e);
};
- InnerObserver.prototype.dispose = function() { this.isStopped = true; };
- InnerObserver.prototype.fail = function (e) {
- if (!this.isStopped) {
- this.isStopped = true;
- this.o.onError(e);
- return true;
- }
- return false;
+ InnerObserver.prototype.completed = function () {
+ this.o.onCompleted();
};
return MapObservable;
@@ -4101,7 +4534,7 @@ observableProto.flatMapConcat = observableProto.concatMap = function(selector, r
}
}
return currentProp;
- }
+ };
}
/**
@@ -4121,13 +4554,6 @@ observableProto.flatMap = observableProto.selectMany = function(selector, result
return new FlatMapObservable(this, selector, resultSelector, thisArg).mergeAll();
};
-
-//
-//Rx.Observable.prototype.flatMapWithMaxConcurrent = function(limit, selector, resultSelector, thisArg) {
-// return new FlatMapObservable(this, selector, resultSelector, thisArg).merge(limit);
-//};
-//
-
Rx.Observable.prototype.flatMapLatest = function(selector, resultSelector, thisArg) {
return new FlatMapObservable(this, selector, resultSelector, thisArg).switchLatest();
};
@@ -4135,47 +4561,35 @@ Rx.Observable.prototype.flatMapLatest = function(selector, resultSelector, thisA
inherits(SkipObservable, __super__);
function SkipObservable(source, count) {
this.source = source;
- this.skipCount = count;
+ this._count = count;
__super__.call(this);
}
-
+
SkipObservable.prototype.subscribeCore = function (o) {
- return this.source.subscribe(new InnerObserver(o, this.skipCount));
+ return this.source.subscribe(new SkipObserver(o, this._count));
};
-
- function InnerObserver(o, c) {
- this.c = c;
- this.r = c;
- this.o = o;
- this.isStopped = false;
+
+ function SkipObserver(o, c) {
+ this._o = o;
+ this._r = c;
+ AbstractObserver.call(this);
}
- InnerObserver.prototype.onNext = function (x) {
- if (this.isStopped) { return; }
- if (this.r <= 0) {
- this.o.onNext(x);
+
+ inherits(SkipObserver, AbstractObserver);
+
+ SkipObserver.prototype.next = function (x) {
+ if (this._r <= 0) {
+ this._o.onNext(x);
} else {
- this.r--;
- }
- };
- InnerObserver.prototype.onError = function(e) {
- if (!this.isStopped) { this.isStopped = true; this.o.onError(e); }
- };
- InnerObserver.prototype.onCompleted = function() {
- if (!this.isStopped) { this.isStopped = true; this.o.onCompleted(); }
- };
- InnerObserver.prototype.dispose = function() { this.isStopped = true; };
- InnerObserver.prototype.fail = function(e) {
- if (!this.isStopped) {
- this.isStopped = true;
- this.o.onError(e);
- return true;
+ this._r--;
}
- return false;
};
-
+ SkipObserver.prototype.error = function(e) { this._o.onError(e); };
+ SkipObserver.prototype.completed = function() { this._o.onCompleted(); };
+
return SkipObservable;
- }(ObservableBase));
-
+ }(ObservableBase));
+
/**
* Bypasses a specified number of elements in an observable sequence and then returns the remaining elements.
* @param {Number} count The number of elements to skip before returning the remaining elements.
@@ -4185,6 +4599,47 @@ Rx.Observable.prototype.flatMapLatest = function(selector, resultSelector, thisA
if (count < 0) { throw new ArgumentOutOfRangeError(); }
return new SkipObservable(this, count);
};
+
+ var SkipWhileObservable = (function (__super__) {
+ inherits(SkipWhileObservable, __super__);
+ function SkipWhileObservable(source, fn) {
+ this.source = source;
+ this._fn = fn;
+ __super__.call(this);
+ }
+
+ SkipWhileObservable.prototype.subscribeCore = function (o) {
+ return this.source.subscribe(new SkipWhileObserver(o, this));
+ };
+
+ return SkipWhileObservable;
+ }(ObservableBase));
+
+ var SkipWhileObserver = (function (__super__) {
+ inherits(SkipWhileObserver, __super__);
+
+ function SkipWhileObserver(o, p) {
+ this._o = o;
+ this._p = p;
+ this._i = 0;
+ this._r = false;
+ __super__.call(this);
+ }
+
+ SkipWhileObserver.prototype.next = function (x) {
+ if (!this._r) {
+ var res = tryCatch(this._p._fn)(x, this._i++, this._p);
+ if (res === errorObj) { return this._o.onError(res.e); }
+ this._r = !res;
+ }
+ this._r && this._o.onNext(x);
+ };
+ SkipWhileObserver.prototype.error = function (e) { this._o.onError(e); };
+ SkipWhileObserver.prototype.completed = function () { this._o.onCompleted(); };
+
+ return SkipWhileObserver;
+ }(AbstractObserver));
+
/**
* Bypasses elements in an observable sequence as long as a specified condition is true and then returns the remaining elements.
* The element's index is used in the logic of the predicate function.
@@ -4196,29 +4651,46 @@ Rx.Observable.prototype.flatMapLatest = function(selector, resultSelector, thisA
* @returns {Observable} An observable sequence that contains the elements from the input sequence starting at the first element in the linear series that does not pass the test specified by predicate.
*/
observableProto.skipWhile = function (predicate, thisArg) {
- var source = this,
- callback = bindCallback(predicate, thisArg, 3);
- return new AnonymousObservable(function (o) {
- var i = 0, running = false;
- return source.subscribe(function (x) {
- if (!running) {
- try {
- running = !callback(x, i++, source);
- } catch (e) {
- o.onError(e);
- return;
- }
- }
- running && o.onNext(x);
- }, function (e) { o.onError(e); }, function () { o.onCompleted(); });
- }, source);
+ var fn = bindCallback(predicate, thisArg, 3);
+ return new SkipWhileObservable(this, fn);
};
+ var TakeObservable = (function(__super__) {
+ inherits(TakeObservable, __super__);
+ function TakeObservable(source, count) {
+ this.source = source;
+ this._count = count;
+ __super__.call(this);
+ }
+
+ TakeObservable.prototype.subscribeCore = function (o) {
+ return this.source.subscribe(new TakeObserver(o, this._count));
+ };
+
+ function TakeObserver(o, c) {
+ this._o = o;
+ this._c = c;
+ this._r = c;
+ AbstractObserver.call(this);
+ }
+
+ inherits(TakeObserver, AbstractObserver);
+
+ TakeObserver.prototype.next = function (x) {
+ if (this._r-- > 0) {
+ this._o.onNext(x);
+ this._r <= 0 && this._o.onCompleted();
+ }
+ };
+
+ TakeObserver.prototype.error = function (e) { this._o.onError(e); };
+ TakeObserver.prototype.completed = function () { this._o.onCompleted(); };
+
+ return TakeObservable;
+ }(ObservableBase));
+
/**
* Returns a specified number of contiguous elements from the start of an observable sequence, using the specified scheduler for the edge case of take(0).
- *
- * var res = source.take(5);
- * var res = source.take(0, Rx.Scheduler.timeout);
* @param {Number} count The number of elements to return.
* @param {Scheduler} [scheduler] Scheduler used to produce an OnCompleted message in case <paramref name="count count</paramref> is set to 0.
* @returns {Observable} An observable sequence that contains the specified number of elements from the start of the input sequence.
@@ -4226,18 +4698,52 @@ Rx.Observable.prototype.flatMapLatest = function(selector, resultSelector, thisA
observableProto.take = function (count, scheduler) {
if (count < 0) { throw new ArgumentOutOfRangeError(); }
if (count === 0) { return observableEmpty(scheduler); }
- var source = this;
- return new AnonymousObservable(function (o) {
- var remaining = count;
- return source.subscribe(function (x) {
- if (remaining-- > 0) {
- o.onNext(x);
- remaining <= 0 && o.onCompleted();
- }
- }, function (e) { o.onError(e); }, function () { o.onCompleted(); });
- }, source);
+ return new TakeObservable(this, count);
};
+ var TakeWhileObservable = (function (__super__) {
+ inherits(TakeWhileObservable, __super__);
+ function TakeWhileObservable(source, fn) {
+ this.source = source;
+ this._fn = fn;
+ __super__.call(this);
+ }
+
+ TakeWhileObservable.prototype.subscribeCore = function (o) {
+ return this.source.subscribe(new TakeWhileObserver(o, this));
+ };
+
+ return TakeWhileObservable;
+ }(ObservableBase));
+
+ var TakeWhileObserver = (function (__super__) {
+ inherits(TakeWhileObserver, __super__);
+
+ function TakeWhileObserver(o, p) {
+ this._o = o;
+ this._p = p;
+ this._i = 0;
+ this._r = true;
+ __super__.call(this);
+ }
+
+ TakeWhileObserver.prototype.next = function (x) {
+ if (this._r) {
+ this._r = tryCatch(this._p._fn)(x, this._i++, this._p);
+ if (this._r === errorObj) { return this._o.onError(this._r.e); }
+ }
+ if (this._r) {
+ this._o.onNext(x);
+ } else {
+ this._o.onCompleted();
+ }
+ };
+ TakeWhileObserver.prototype.error = function (e) { this._o.onError(e); };
+ TakeWhileObserver.prototype.completed = function () { this._o.onCompleted(); };
+
+ return TakeWhileObserver;
+ }(AbstractObserver));
+
/**
* Returns elements from an observable sequence as long as a specified condition is true.
* The element's index is used in the logic of the predicate function.
@@ -4246,26 +4752,8 @@ Rx.Observable.prototype.flatMapLatest = function(selector, resultSelector, thisA
* @returns {Observable} An observable sequence that contains the elements from the input sequence that occur before the element at which the test no longer passes.
*/
observableProto.takeWhile = function (predicate, thisArg) {
- var source = this,
- callback = bindCallback(predicate, thisArg, 3);
- return new AnonymousObservable(function (o) {
- var i = 0, running = true;
- return source.subscribe(function (x) {
- if (running) {
- try {
- running = callback(x, i++, source);
- } catch (e) {
- o.onError(e);
- return;
- }
- if (running) {
- o.onNext(x);
- } else {
- o.onCompleted();
- }
- }
- }, function (e) { o.onError(e); }, function () { o.onCompleted(); });
- }, source);
+ var fn = bindCallback(predicate, thisArg, 3);
+ return new TakeWhileObservable(this, fn);
};
var FilterObservable = (function (__super__) {
@@ -4280,7 +4768,7 @@ Rx.Observable.prototype.flatMapLatest = function(selector, resultSelector, thisA
FilterObservable.prototype.subscribeCore = function (o) {
return this.source.subscribe(new InnerObserver(o, this.predicate, this));
};
-
+
function innerPredicate(predicate, self) {
return function(x, i, o) { return self.predicate(x, i, o) && predicate.call(this, x, i, o); }
}
@@ -4288,37 +4776,30 @@ Rx.Observable.prototype.flatMapLatest = function(selector, resultSelector, thisA
FilterObservable.prototype.internalFilter = function(predicate, thisArg) {
return new FilterObservable(this.source, innerPredicate(predicate, this), thisArg);
};
-
+
+ inherits(InnerObserver, AbstractObserver);
function InnerObserver(o, predicate, source) {
this.o = o;
this.predicate = predicate;
this.source = source;
this.i = 0;
- this.isStopped = false;
+ AbstractObserver.call(this);
}
-
- InnerObserver.prototype.onNext = function(x) {
- if (this.isStopped) { return; }
+
+ InnerObserver.prototype.next = function(x) {
var shouldYield = tryCatch(this.predicate)(x, this.i++, this.source);
if (shouldYield === errorObj) {
return this.o.onError(shouldYield.e);
}
shouldYield && this.o.onNext(x);
};
- InnerObserver.prototype.onError = function (e) {
- if(!this.isStopped) { this.isStopped = true; this.o.onError(e); }
- };
- InnerObserver.prototype.onCompleted = function () {
- if(!this.isStopped) { this.isStopped = true; this.o.onCompleted(); }
+
+ InnerObserver.prototype.error = function (e) {
+ this.o.onError(e);
};
- InnerObserver.prototype.dispose = function() { this.isStopped = true; };
- InnerObserver.prototype.fail = function (e) {
- if (!this.isStopped) {
- this.isStopped = true;
- this.o.onError(e);
- return true;
- }
- return false;
+
+ InnerObserver.prototype.completed = function () {
+ this.o.onCompleted();
};
return FilterObservable;
@@ -4433,6 +4914,16 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
};
};
+ function isNodeList(el) {
+ if (root.StaticNodeList) {
+ // IE8 Specific
+ // instanceof is slower than Object#toString, but Object#toString will not work as intended in IE8
+ return el instanceof root.StaticNodeList || el instanceof root.NodeList;
+ } else {
+ return Object.prototype.toString.call(el) === '[object NodeList]';
+ }
+ }
+
function ListenDisposable(e, n, fn) {
this._e = e;
this._n = n;
@@ -4452,7 +4943,7 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
// Asume NodeList or HTMLCollection
var elemToString = Object.prototype.toString.call(el);
- if (elemToString === '[object NodeList]' || elemToString === '[object HTMLCollection]') {
+ if (isNodeList(el) || elemToString === '[object HTMLCollection]') {
for (var i = 0, len = el.length; i < len; i++) {
disposables.add(createEventListener(el.item(i), eventName, handler));
}
@@ -4468,16 +4959,35 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
*/
Rx.config.useNativeEvents = false;
- function eventHandler(o, selector) {
- return function handler () {
- var results = arguments[0];
- if (isFunction(selector)) {
- results = tryCatch(selector).apply(null, arguments);
- if (results === errorObj) { return o.onError(results.e); }
- }
- o.onNext(results);
+ var EventObservable = (function(__super__) {
+ inherits(EventObservable, __super__);
+ function EventObservable(el, name, fn) {
+ this._el = el;
+ this._n = name;
+ this._fn = fn;
+ __super__.call(this);
+ }
+
+ function createHandler(o, fn) {
+ return function handler () {
+ var results = arguments[0];
+ if (isFunction(fn)) {
+ results = tryCatch(fn).apply(null, arguments);
+ if (results === errorObj) { return o.onError(results.e); }
+ }
+ o.onNext(results);
+ };
+ }
+
+ EventObservable.prototype.subscribeCore = function (o) {
+ return createEventListener(
+ this._el,
+ this._n,
+ createHandler(o, this._fn));
};
- }
+
+ return EventObservable;
+ }(ObservableBase));
/**
* Creates an observable sequence by adding an event listener to the matching DOMElement or each item in the NodeList.
@@ -4506,54 +5016,93 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
}
}
- return new AnonymousObservable(function (o) {
- return createEventListener(
- element,
- eventName,
- eventHandler(o, selector));
- }).publish().refCount();
+ return new EventObservable(element, eventName, selector).publish().refCount();
};
+ var EventPatternObservable = (function(__super__) {
+ inherits(EventPatternObservable, __super__);
+ function EventPatternObservable(add, del, fn) {
+ this._add = add;
+ this._del = del;
+ this._fn = fn;
+ __super__.call(this);
+ }
+
+ function createHandler(o, fn) {
+ return function handler () {
+ var results = arguments[0];
+ if (isFunction(fn)) {
+ results = tryCatch(fn).apply(null, arguments);
+ if (results === errorObj) { return o.onError(results.e); }
+ }
+ o.onNext(results);
+ };
+ }
+
+ EventPatternObservable.prototype.subscribeCore = function (o) {
+ var fn = createHandler(o, this._fn);
+ var returnValue = this._add(fn);
+ return new EventPatternDisposable(this._del, fn, returnValue);
+ };
+
+ function EventPatternDisposable(del, fn, ret) {
+ this._del = del;
+ this._fn = fn;
+ this._ret = ret;
+ this.isDisposed = false;
+ }
+
+ EventPatternDisposable.prototype.dispose = function () {
+ if(!this.isDisposed) {
+ isFunction(this._del) && this._del(this._fn, this._ret);
+ this.isDisposed = true;
+ }
+ };
+
+ return EventPatternObservable;
+ }(ObservableBase));
+
/**
* Creates an observable sequence from an event emitter via an addHandler/removeHandler pair.
* @param {Function} addHandler The function to add a handler to the emitter.
* @param {Function} [removeHandler] The optional function to remove a handler from an emitter.
* @param {Function} [selector] A selector which takes the arguments from the event handler to produce a single item to yield on next.
- * @param {Scheduler} [scheduler] A scheduler used to schedule the remove handler.
* @returns {Observable} An observable sequence which wraps an event from an event emitter
*/
- var fromEventPattern = Observable.fromEventPattern = function (addHandler, removeHandler, selector, scheduler) {
- isScheduler(scheduler) || (scheduler = immediateScheduler);
- return new AnonymousObservable(function (o) {
- function innerHandler () {
- var result = arguments[0];
- if (isFunction(selector)) {
- result = tryCatch(selector).apply(null, arguments);
- if (result === errorObj) { return o.onError(result.e); }
- }
- o.onNext(result);
- }
-
- var returnValue = addHandler(innerHandler);
- return disposableCreate(function () {
- isFunction(removeHandler) && removeHandler(innerHandler, returnValue);
- });
- }).publish().refCount();
+ var fromEventPattern = Observable.fromEventPattern = function (addHandler, removeHandler, selector) {
+ return new EventPatternObservable(addHandler, removeHandler, selector).publish().refCount();
};
var FromPromiseObservable = (function(__super__) {
inherits(FromPromiseObservable, __super__);
- function FromPromiseObservable(p) {
- this.p = p;
+ function FromPromiseObservable(p, s) {
+ this._p = p;
+ this._s = s;
__super__.call(this);
}
+ function scheduleNext(s, state) {
+ var o = state[0], data = state[1];
+ o.onNext(data);
+ o.onCompleted();
+ }
+
+ function scheduleError(s, state) {
+ var o = state[0], err = state[1];
+ o.onError(err);
+ }
+
FromPromiseObservable.prototype.subscribeCore = function(o) {
- this.p.then(function (data) {
- o.onNext(data);
- o.onCompleted();
- }, function (err) { o.onError(err); });
- return disposableEmpty;
+ var sad = new SingleAssignmentDisposable(), self = this;
+
+ this._p
+ .then(function (data) {
+ sad.setDisposable(self._s.schedule([o, data], scheduleNext));
+ }, function (err) {
+ sad.setDisposable(self._s.schedule([o, err], scheduleError));
+ });
+
+ return sad;
};
return FromPromiseObservable;
@@ -4564,9 +5113,11 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
* @param {Promise} An ES6 Compliant promise.
* @returns {Observable} An Observable sequence which wraps the existing promise success and failure.
*/
- var observableFromPromise = Observable.fromPromise = function (promise) {
- return new FromPromiseObservable(promise);
+ var observableFromPromise = Observable.fromPromise = function (promise, scheduler) {
+ scheduler || (scheduler = defaultScheduler);
+ return new FromPromiseObservable(promise, scheduler);
};
+
/*
* Converts an existing observable sequence to an ES6 Compatible Promise
* @example
@@ -4584,12 +5135,11 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
var source = this;
return new promiseCtor(function (resolve, reject) {
// No cancellation can be done
- var value, hasValue = false;
+ var value;
source.subscribe(function (v) {
value = v;
- hasValue = true;
}, reject, function () {
- hasValue && resolve(value);
+ resolve(value);
});
});
};
@@ -4600,14 +5150,27 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
* @returns {Observable} An observable sequence exposing the function's result value, or an exception.
*/
Observable.startAsync = function (functionAsync) {
- var promise;
- try {
- promise = functionAsync();
- } catch (e) {
- return observableThrow(e);
- }
+ var promise = tryCatch(functionAsync)();
+ if (promise === errorObj) { return observableThrow(promise.e); }
return observableFromPromise(promise);
- }
+ };
+
+ var MulticastObservable = (function (__super__) {
+ inherits(MulticastObservable, __super__);
+ function MulticastObservable(source, fn1, fn2) {
+ this.source = source;
+ this._fn1 = fn1;
+ this._fn2 = fn2;
+ __super__.call(this);
+ }
+
+ MulticastObservable.prototype.subscribeCore = function (o) {
+ var connectable = this.source.multicast(this._fn1());
+ return new BinaryDisposable(this._fn2(connectable).subscribe(o), connectable.connect());
+ };
+
+ return MulticastObservable;
+ }(ObservableBase));
/**
* Multicasts the source sequence notifications through an instantiated subject into all uses of the sequence within a selector function. Each
@@ -4627,13 +5190,9 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
* @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source sequence within a selector function.
*/
observableProto.multicast = function (subjectOrSubjectSelector, selector) {
- var source = this;
- return typeof subjectOrSubjectSelector === 'function' ?
- new AnonymousObservable(function (observer) {
- var connectable = source.multicast(subjectOrSubjectSelector());
- return new CompositeDisposable(selector(connectable).subscribe(observer), connectable.connect());
- }, source) :
- new ConnectableObservable(source, subjectOrSubjectSelector);
+ return isFunction(subjectOrSubjectSelector) ?
+ new MulticastObservable(this, subjectOrSubjectSelector, selector) :
+ new ConnectableObservable(this, subjectOrSubjectSelector);
};
/**
@@ -4750,72 +5309,115 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
return this.replay(null, bufferSize, windowSize, scheduler).refCount();
};
+ var RefCountObservable = (function (__super__) {
+ inherits(RefCountObservable, __super__);
+ function RefCountObservable(source) {
+ this.source = source;
+ this._count = 0;
+ this._connectableSubscription = null;
+ __super__.call(this);
+ }
+
+ RefCountObservable.prototype.subscribeCore = function (o) {
+ var subscription = this.source.subscribe(o);
+ ++this._count === 1 && (this._connectableSubscription = this.source.connect());
+ return new RefCountDisposable(this, subscription);
+ };
+
+ function RefCountDisposable(p, s) {
+ this._p = p;
+ this._s = s;
+ this.isDisposed = false;
+ }
+
+ RefCountDisposable.prototype.dispose = function () {
+ if (!this.isDisposed) {
+ this.isDisposed = true;
+ this._s.dispose();
+ --this._p._count === 0 && this._p._connectableSubscription.dispose();
+ }
+ };
+
+ return RefCountObservable;
+ }(ObservableBase));
+
var ConnectableObservable = Rx.ConnectableObservable = (function (__super__) {
inherits(ConnectableObservable, __super__);
-
function ConnectableObservable(source, subject) {
- var hasSubscription = false,
- subscription,
- sourceObservable = source.asObservable();
-
- this.connect = function () {
- if (!hasSubscription) {
- hasSubscription = true;
- subscription = new CompositeDisposable(sourceObservable.subscribe(subject), disposableCreate(function () {
- hasSubscription = false;
- }));
- }
- return subscription;
- };
+ this.source = source;
+ this._connection = null;
+ this._source = source.asObservable();
+ this._subject = subject;
+ __super__.call(this);
+ }
- __super__.call(this, function (o) { return subject.subscribe(o); });
+ function ConnectDisposable(parent, subscription) {
+ this._p = parent;
+ this._s = subscription;
}
+ ConnectDisposable.prototype.dispose = function () {
+ if (this._s) {
+ this._s.dispose();
+ this._s = null;
+ this._p._connection = null;
+ }
+ };
+
+ ConnectableObservable.prototype.connect = function () {
+ if (!this._connection) {
+ var subscription = this._source.subscribe(this._subject);
+ this._connection = new ConnectDisposable(this, subscription);
+ }
+ return this._connection;
+ };
+
+ ConnectableObservable.prototype._subscribe = function (o) {
+ return this._subject.subscribe(o);
+ };
+
ConnectableObservable.prototype.refCount = function () {
- var connectableSubscription, count = 0, source = this;
- return new AnonymousObservable(function (observer) {
- var shouldConnect = ++count === 1,
- subscription = source.subscribe(observer);
- shouldConnect && (connectableSubscription = source.connect());
- return function () {
- subscription.dispose();
- --count === 0 && connectableSubscription.dispose();
- };
- });
+ return new RefCountObservable(this);
};
return ConnectableObservable;
}(Observable));
- function observableTimerDate(dueTime, scheduler) {
- return new AnonymousObservable(function (observer) {
- return scheduler.scheduleWithAbsolute(dueTime, function () {
- observer.onNext(0);
- observer.onCompleted();
- });
- });
+ var TimerObservable = (function(__super__) {
+ inherits(TimerObservable, __super__);
+ function TimerObservable(dt, s) {
+ this._dt = dt;
+ this._s = s;
+ __super__.call(this);
+ }
+
+ TimerObservable.prototype.subscribeCore = function (o) {
+ return this._s.scheduleFuture(o, this._dt, scheduleMethod);
+ };
+
+ function scheduleMethod(s, o) {
+ o.onNext(0);
+ o.onCompleted();
+ }
+
+ return TimerObservable;
+ }(ObservableBase));
+
+ function _observableTimer(dueTime, scheduler) {
+ return new TimerObservable(dueTime, scheduler);
}
function observableTimerDateAndPeriod(dueTime, period, scheduler) {
return new AnonymousObservable(function (observer) {
var d = dueTime, p = normalizeTime(period);
- return scheduler.scheduleRecursiveWithAbsoluteAndState(0, d, function (count, self) {
+ return scheduler.scheduleRecursiveFuture(0, d, function (count, self) {
if (p > 0) {
var now = scheduler.now();
- d = d + p;
- d <= now && (d = now + p);
+ d = new Date(d.getTime() + p);
+ d.getTime() <= now && (d = new Date(now + p));
}
observer.onNext(count);
- self(count + 1, d);
- });
- });
- }
-
- function observableTimerTimeSpan(dueTime, scheduler) {
- return new AnonymousObservable(function (observer) {
- return scheduler.scheduleWithRelative(normalizeTime(dueTime), function () {
- observer.onNext(0);
- observer.onCompleted();
+ self(count + 1, new Date(d));
});
});
}
@@ -4823,13 +5425,13 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
function observableTimerTimeSpanAndPeriod(dueTime, period, scheduler) {
return dueTime === period ?
new AnonymousObservable(function (observer) {
- return scheduler.schedulePeriodicWithState(0, period, function (count) {
+ return scheduler.schedulePeriodic(0, period, function (count) {
observer.onNext(count);
return count + 1;
});
}) :
observableDefer(function () {
- return observableTimerDateAndPeriod(scheduler.now() + dueTime, period, scheduler);
+ return observableTimerDateAndPeriod(new Date(scheduler.now() + dueTime), period, scheduler);
});
}
@@ -4845,7 +5447,7 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
* @returns {Observable} An observable sequence that produces a value after each period.
*/
var observableinterval = Observable.interval = function (period, scheduler) {
- return observableTimerTimeSpanAndPeriod(period, period, isScheduler(scheduler) ? scheduler : timeoutScheduler);
+ return observableTimerTimeSpanAndPeriod(period, period, isScheduler(scheduler) ? scheduler : defaultScheduler);
};
/**
@@ -4857,21 +5459,19 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
*/
var observableTimer = Observable.timer = function (dueTime, periodOrScheduler, scheduler) {
var period;
- isScheduler(scheduler) || (scheduler = timeoutScheduler);
+ isScheduler(scheduler) || (scheduler = defaultScheduler);
if (periodOrScheduler != null && typeof periodOrScheduler === 'number') {
period = periodOrScheduler;
} else if (isScheduler(periodOrScheduler)) {
scheduler = periodOrScheduler;
}
- if (dueTime instanceof Date && period === undefined) {
- return observableTimerDate(dueTime.getTime(), scheduler);
+ if ((dueTime instanceof Date || typeof dueTime === 'number') && period === undefined) {
+ return _observableTimer(dueTime, scheduler);
}
if (dueTime instanceof Date && period !== undefined) {
- return observableTimerDateAndPeriod(dueTime.getTime(), periodOrScheduler, scheduler);
+ return observableTimerDateAndPeriod(dueTime, periodOrScheduler, scheduler);
}
- return period === undefined ?
- observableTimerTimeSpan(dueTime, scheduler) :
- observableTimerTimeSpanAndPeriod(dueTime, period, scheduler);
+ return observableTimerTimeSpanAndPeriod(dueTime, period, scheduler);
};
function observableDelayRelative(source, dueTime, scheduler) {
@@ -4887,7 +5487,7 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
if (notification.value.kind === 'E') {
q = [];
q.push(notification);
- exception = notification.value.exception;
+ exception = notification.value.error;
shouldRun = !running;
} else {
q.push({ value: notification.value, timestamp: notification.timestamp + dueTime });
@@ -4900,7 +5500,7 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
} else {
d = new SingleAssignmentDisposable();
cancelable.setDisposable(d);
- d.setDisposable(scheduler.scheduleRecursiveWithRelative(dueTime, function (self) {
+ d.setDisposable(scheduler.scheduleRecursiveFuture(null, dueTime, function (_, self) {
var e, recurseDueTime, result, shouldRecurse;
if (exception !== null) {
return;
@@ -4928,13 +5528,13 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
if (e !== null) {
o.onError(e);
} else if (shouldRecurse) {
- self(recurseDueTime);
+ self(null, recurseDueTime);
}
}));
}
}
});
- return new CompositeDisposable(subscription, cancelable);
+ return new BinaryDisposable(subscription, cancelable);
}, source);
}
@@ -4995,8 +5595,8 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
subscription.setDisposable(subDelay.subscribe(start, function (e) { o.onError(e); }, start));
}
- return new CompositeDisposable(subscription, delays);
- }, this);
+ return new BinaryDisposable(subscription, delays);
+ }, source);
}
/**
@@ -5008,52 +5608,86 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
* @returns {Observable} Time-shifted sequence.
*/
observableProto.delay = function () {
- if (typeof arguments[0] === 'number' || arguments[0] instanceof Date) {
- var dueTime = arguments[0], scheduler = arguments[1];
- isScheduler(scheduler) || (scheduler = timeoutScheduler);
+ var firstArg = arguments[0];
+ if (typeof firstArg === 'number' || firstArg instanceof Date) {
+ var dueTime = firstArg, scheduler = arguments[1];
+ isScheduler(scheduler) || (scheduler = defaultScheduler);
return dueTime instanceof Date ?
observableDelayAbsolute(this, dueTime, scheduler) :
observableDelayRelative(this, dueTime, scheduler);
- } else if (isFunction(arguments[0])) {
- return delayWithSelector(this, arguments[0], arguments[1]);
+ } else if (Observable.isObservable(firstArg) || isFunction(firstArg)) {
+ return delayWithSelector(this, firstArg, arguments[1]);
} else {
throw new Error('Invalid arguments');
}
};
- function debounce(source, dueTime, scheduler) {
- isScheduler(scheduler) || (scheduler = timeoutScheduler);
- return new AnonymousObservable(function (observer) {
- var cancelable = new SerialDisposable(), hasvalue = false, value, id = 0;
- var subscription = source.subscribe(
- function (x) {
- hasvalue = true;
- value = x;
- id++;
- var currentId = id,
- d = new SingleAssignmentDisposable();
- cancelable.setDisposable(d);
- d.setDisposable(scheduler.scheduleWithRelative(dueTime, function () {
- hasvalue && id === currentId && observer.onNext(value);
- hasvalue = false;
- }));
- },
- function (e) {
- cancelable.dispose();
- observer.onError(e);
- hasvalue = false;
- id++;
- },
- function () {
- cancelable.dispose();
- hasvalue && observer.onNext(value);
- observer.onCompleted();
- hasvalue = false;
- id++;
- });
- return new CompositeDisposable(subscription, cancelable);
- }, this);
- }
+ var DebounceObservable = (function (__super__) {
+ inherits(DebounceObservable, __super__);
+ function DebounceObservable(source, dt, s) {
+ isScheduler(s) || (s = defaultScheduler);
+ this.source = source;
+ this._dt = dt;
+ this._s = s;
+ __super__.call(this);
+ }
+
+ DebounceObservable.prototype.subscribeCore = function (o) {
+ var cancelable = new SerialDisposable();
+ return new BinaryDisposable(
+ this.source.subscribe(new DebounceObserver(o, this._dt, this._s, cancelable)),
+ cancelable);
+ };
+
+ return DebounceObservable;
+ }(ObservableBase));
+
+ var DebounceObserver = (function (__super__) {
+ inherits(DebounceObserver, __super__);
+ function DebounceObserver(observer, dueTime, scheduler, cancelable) {
+ this._o = observer;
+ this._d = dueTime;
+ this._scheduler = scheduler;
+ this._c = cancelable;
+ this._v = null;
+ this._hv = false;
+ this._id = 0;
+ __super__.call(this);
+ }
+
+ function scheduleFuture(s, state) {
+ state.self._hv && state.self._id === state.currentId && state.self._o.onNext(state.x);
+ state.self._hv = false;
+ }
+
+ DebounceObserver.prototype.next = function (x) {
+ this._hv = true;
+ this._v = x;
+ var currentId = ++this._id, d = new SingleAssignmentDisposable();
+ this._c.setDisposable(d);
+ d.setDisposable(this._scheduler.scheduleFuture(this, this._d, function (_, self) {
+ self._hv && self._id === currentId && self._o.onNext(x);
+ self._hv = false;
+ }));
+ };
+
+ DebounceObserver.prototype.error = function (e) {
+ this._c.dispose();
+ this._o.onError(e);
+ this._hv = false;
+ this._id++;
+ };
+
+ DebounceObserver.prototype.completed = function () {
+ this._c.dispose();
+ this._hv && this._o.onNext(this._v);
+ this._o.onCompleted();
+ this._hv = false;
+ this._id++;
+ };
+
+ return DebounceObserver;
+ }(AbstractObserver));
function debounceWithSelector(source, durationSelector) {
return new AnonymousObservable(function (o) {
@@ -5098,7 +5732,7 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
id++;
}
);
- return new CompositeDisposable(subscription, cancelable);
+ return new BinaryDisposable(subscription, cancelable);
}, source);
}
@@ -5106,12 +5740,50 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
if (isFunction (arguments[0])) {
return debounceWithSelector(this, arguments[0]);
} else if (typeof arguments[0] === 'number') {
- return debounce(this, arguments[0], arguments[1]);
+ return new DebounceObservable(this, arguments[0], arguments[1]);
} else {
throw new Error('Invalid arguments');
}
};
+ var TimestampObservable = (function (__super__) {
+ inherits(TimestampObservable, __super__);
+ function TimestampObservable(source, s) {
+ this.source = source;
+ this._s = s;
+ __super__.call(this);
+ }
+
+ TimestampObservable.prototype.subscribeCore = function (o) {
+ return this.source.subscribe(new TimestampObserver(o, this._s));
+ };
+
+ return TimestampObservable;
+ }(ObservableBase));
+
+ var TimestampObserver = (function (__super__) {
+ inherits(TimestampObserver, __super__);
+ function TimestampObserver(o, s) {
+ this._o = o;
+ this._s = s;
+ __super__.call(this);
+ }
+
+ TimestampObserver.prototype.next = function (x) {
+ this._o.onNext({ value: x, timestamp: this._s.now() });
+ };
+
+ TimestampObserver.prototype.error = function (e) {
+ this._o.onError(e);
+ };
+
+ TimestampObserver.prototype.completed = function () {
+ this._o.onCompleted();
+ };
+
+ return TimestampObserver;
+ }(AbstractObserver));
+
/**
* Records the timestamp for each value in an observable sequence.
*
@@ -5123,43 +5795,78 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
* @returns {Observable} An observable sequence with timestamp information on values.
*/
observableProto.timestamp = function (scheduler) {
- isScheduler(scheduler) || (scheduler = timeoutScheduler);
- return this.map(function (x) {
- return { value: x, timestamp: scheduler.now() };
- });
+ isScheduler(scheduler) || (scheduler = defaultScheduler);
+ return new TimestampObservable(this, scheduler);
};
- function sampleObservable(source, sampler) {
- return new AnonymousObservable(function (o) {
- var atEnd = false, value, hasValue = false;
+ var SampleObservable = (function(__super__) {
+ inherits(SampleObservable, __super__);
+ function SampleObservable(source, sampler) {
+ this.source = source;
+ this._sampler = sampler;
+ __super__.call(this);
+ }
- function sampleSubscribe() {
- if (hasValue) {
- hasValue = false;
- o.onNext(value);
- }
- atEnd && o.onCompleted();
+ SampleObservable.prototype.subscribeCore = function (o) {
+ var state = {
+ o: o,
+ atEnd: false,
+ value: null,
+ hasValue: false,
+ sourceSubscription: new SingleAssignmentDisposable()
+ };
+
+ state.sourceSubscription.setDisposable(this.source.subscribe(new SampleSourceObserver(state)));
+ return new BinaryDisposable(
+ state.sourceSubscription,
+ this._sampler.subscribe(new SamplerObserver(state))
+ );
+ };
+
+ return SampleObservable;
+ }(ObservableBase));
+
+ var SamplerObserver = (function(__super__) {
+ inherits(SamplerObserver, __super__);
+ function SamplerObserver(s) {
+ this._s = s;
+ __super__.call(this);
+ }
+
+ SamplerObserver.prototype._handleMessage = function () {
+ if (this._s.hasValue) {
+ this._s.hasValue = false;
+ this._s.o.onNext(this._s.value);
}
+ this._s.atEnd && this._s.o.onCompleted();
+ };
- var sourceSubscription = new SingleAssignmentDisposable();
- sourceSubscription.setDisposable(source.subscribe(
- function (newValue) {
- hasValue = true;
- value = newValue;
- },
- function (e) { o.onError(e); },
- function () {
- atEnd = true;
- sourceSubscription.dispose();
- }
- ));
+ SamplerObserver.prototype.next = function () { this._handleMessage(); };
+ SamplerObserver.prototype.error = function (e) { this._s.onError(e); };
+ SamplerObserver.prototype.completed = function () { this._handleMessage(); };
- return new CompositeDisposable(
- sourceSubscription,
- sampler.subscribe(sampleSubscribe, function (e) { o.onError(e); }, sampleSubscribe)
- );
- }, source);
- }
+ return SamplerObserver;
+ }(AbstractObserver));
+
+ var SampleSourceObserver = (function(__super__) {
+ inherits(SampleSourceObserver, __super__);
+ function SampleSourceObserver(s) {
+ this._s = s;
+ __super__.call(this);
+ }
+
+ SampleSourceObserver.prototype.next = function (x) {
+ this._s.hasValue = true;
+ this._s.value = x;
+ };
+ SampleSourceObserver.prototype.error = function (e) { this._s.o.onError(e); };
+ SampleSourceObserver.prototype.completed = function () {
+ this._s.atEnd = true;
+ this._s.sourceSubscription.dispose();
+ };
+
+ return SampleSourceObserver;
+ }(AbstractObserver));
/**
* Samples the observable sequence at each interval.
@@ -5173,11 +5880,11 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
* @param {Scheduler} [scheduler] Scheduler to run the sampling timer on. If not specified, the timeout scheduler is used.
* @returns {Observable} Sampled observable sequence.
*/
- observableProto.sample = observableProto.throttleLatest = function (intervalOrSampler, scheduler) {
- isScheduler(scheduler) || (scheduler = timeoutScheduler);
+ observableProto.sample = function (intervalOrSampler, scheduler) {
+ isScheduler(scheduler) || (scheduler = defaultScheduler);
return typeof intervalOrSampler === 'number' ?
- sampleObservable(this, observableinterval(intervalOrSampler, scheduler)) :
- sampleObservable(this, intervalOrSampler);
+ new SampleObservable(this, observableinterval(intervalOrSampler, scheduler)) :
+ new SampleObservable(this, intervalOrSampler);
};
var TimeoutError = Rx.TimeoutError = function(message) {
@@ -5193,9 +5900,11 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
timeoutDurationSelector = firstTimeout;
firstTimeout = observableNever();
}
- other || (other = observableThrow(new TimeoutError()));
+ Observable.isObservable(other) || (other = observableThrow(new TimeoutError()));
return new AnonymousObservable(function (o) {
- var subscription = new SerialDisposable(), timer = new SerialDisposable(), original = new SingleAssignmentDisposable();
+ var subscription = new SerialDisposable(),
+ timer = new SerialDisposable(),
+ original = new SingleAssignmentDisposable();
subscription.setDisposable(original);
@@ -5203,14 +5912,20 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
function setTimer(timeout) {
var myId = id, d = new SingleAssignmentDisposable();
+
+ function timerWins() {
+ switched = (myId === id);
+ return switched;
+ }
+
timer.setDisposable(d);
d.setDisposable(timeout.subscribe(function () {
- id === myId && subscription.setDisposable(other.subscribe(o));
+ timerWins() && subscription.setDisposable(other.subscribe(o));
d.dispose();
}, function (e) {
- id === myId && o.onError(e);
+ timerWins() && o.onError(e);
}, function () {
- id === myId && subscription.setDisposable(other.subscribe(o));
+ timerWins() && subscription.setDisposable(other.subscribe(o));
}));
};
@@ -5234,23 +5949,18 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
}, function () {
oWins() && o.onCompleted();
}));
- return new CompositeDisposable(subscription, timer);
+ return new BinaryDisposable(subscription, timer);
}, source);
}
function timeout(source, dueTime, other, scheduler) {
- if (other == null) { throw new Error('other or scheduler must be specified'); }
if (isScheduler(other)) {
scheduler = other;
other = observableThrow(new TimeoutError());
}
if (other instanceof Error) { other = observableThrow(other); }
- isScheduler(scheduler) || (scheduler = timeoutScheduler);
-
- var schedulerMethod = dueTime instanceof Date ?
- 'scheduleWithAbsolute' :
- 'scheduleWithRelative';
-
+ isScheduler(scheduler) || (scheduler = defaultScheduler);
+ Observable.isObservable(other) || (other = observableThrow(new TimeoutError()));
return new AnonymousObservable(function (o) {
var id = 0,
original = new SingleAssignmentDisposable(),
@@ -5262,8 +5972,9 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
function createTimer() {
var myId = id;
- timer.setDisposable(scheduler[schedulerMethod](dueTime, function () {
- if (id === myId) {
+ timer.setDisposable(scheduler.scheduleFuture(null, dueTime, function () {
+ switched = id === myId;
+ if (switched) {
isPromise(other) && (other = observableFromPromise(other));
subscription.setDisposable(other.subscribe(o));
}
@@ -5289,7 +6000,7 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
o.onCompleted();
}
}));
- return new CompositeDisposable(subscription, timer);
+ return new BinaryDisposable(subscription, timer);
}, source);
}
@@ -5311,7 +6022,7 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
* @returns {Observable} An Observable that performs the throttle operation.
*/
observableProto.throttle = function (windowDuration, scheduler) {
- isScheduler(scheduler) || (scheduler = timeoutScheduler);
+ isScheduler(scheduler) || (scheduler = defaultScheduler);
var duration = +windowDuration || 0;
if (duration <= 0) { throw new RangeError('windowDuration cannot be less or equal zero.'); }
var source = this;
@@ -5330,12 +6041,23 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
};
var PausableObservable = (function (__super__) {
-
inherits(PausableObservable, __super__);
+ function PausableObservable(source, pauser) {
+ this.source = source;
+ this.controller = new Subject();
+
+ if (pauser && pauser.subscribe) {
+ this.pauser = this.controller.merge(pauser);
+ } else {
+ this.pauser = this.controller;
+ }
+
+ __super__.call(this);
+ }
- function subscribe(observer) {
+ PausableObservable.prototype._subscribe = function (o) {
var conn = this.source.publish(),
- subscription = conn.subscribe(observer),
+ subscription = conn.subscribe(o),
connection = disposableEmpty;
var pausable = this.pauser.distinctUntilChanged().subscribe(function (b) {
@@ -5347,21 +6069,8 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
}
});
- return new CompositeDisposable(subscription, connection, pausable);
- }
-
- function PausableObservable(source, pauser) {
- this.source = source;
- this.controller = new Subject();
-
- if (pauser && pauser.subscribe) {
- this.pauser = this.controller.merge(pauser);
- } else {
- this.pauser = this.controller;
- }
-
- __super__.call(this, subscribe, source);
- }
+ return new NAryDisposable([subscription, connection, pausable]);
+ };
PausableObservable.prototype.pause = function () {
this.controller.onNext(false);
@@ -5407,7 +6116,7 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
isDone && values[1] && o.onCompleted();
}
- return new CompositeDisposable(
+ return new BinaryDisposable(
source.subscribe(
function (x) {
next(x, 0);
@@ -5437,10 +6146,21 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
}
var PausableBufferedObservable = (function (__super__) {
-
inherits(PausableBufferedObservable, __super__);
+ function PausableBufferedObservable(source, pauser) {
+ this.source = source;
+ this.controller = new Subject();
+
+ if (pauser && pauser.subscribe) {
+ this.pauser = this.controller.merge(pauser);
+ } else {
+ this.pauser = this.controller;
+ }
- function subscribe(o) {
+ __super__.call(this);
+ }
+
+ PausableBufferedObservable.prototype._subscribe = function (o) {
var q = [], previousShouldFire;
function drainQueue() { while (q.length > 0) { o.onNext(q.shift()); } }
@@ -5454,7 +6174,7 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
})
.subscribe(
function (results) {
- if (previousShouldFire !== undefined && results.shouldFire != previousShouldFire) {
+ if (previousShouldFire !== undefined && results.shouldFire !== previousShouldFire) {
previousShouldFire = results.shouldFire;
// change in shouldFire
if (results.shouldFire) { drainQueue(); }
@@ -5477,21 +6197,8 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
o.onCompleted();
}
);
- return subscription;
- }
-
- function PausableBufferedObservable(source, pauser) {
- this.source = source;
- this.controller = new Subject();
-
- if (pauser && pauser.subscribe) {
- this.pauser = this.controller.merge(pauser);
- } else {
- this.pauser = this.controller;
- }
-
- __super__.call(this, subscribe, source);
- }
+ return subscription;
+ };
PausableBufferedObservable.prototype.pause = function () {
this.controller.onNext(false);
@@ -5514,151 +6221,146 @@ Observable.fromNodeCallback = function (fn, ctx, selector) {
* @param {Observable} pauser The observable sequence used to pause the underlying sequence.
* @returns {Observable} The observable sequence which is paused based upon the pauser.
*/
- observableProto.pausableBuffered = function (subject) {
- return new PausableBufferedObservable(this, subject);
+ observableProto.pausableBuffered = function (pauser) {
+ return new PausableBufferedObservable(this, pauser);
};
-var ControlledObservable = (function (__super__) {
-
- inherits(ControlledObservable, __super__);
-
- function subscribe (observer) {
- return this.source.subscribe(observer);
- }
-
- function ControlledObservable (source, enableQueue, scheduler) {
- __super__.call(this, subscribe, source);
- this.subject = new ControlledSubject(enableQueue, scheduler);
- this.source = source.multicast(this.subject).refCount();
- }
+ var ControlledObservable = (function (__super__) {
+ inherits(ControlledObservable, __super__);
+ function ControlledObservable (source, enableQueue, scheduler) {
+ __super__.call(this);
+ this.subject = new ControlledSubject(enableQueue, scheduler);
+ this.source = source.multicast(this.subject).refCount();
+ }
- ControlledObservable.prototype.request = function (numberOfItems) {
- return this.subject.request(numberOfItems == null ? -1 : numberOfItems);
- };
+ ControlledObservable.prototype._subscribe = function (o) {
+ return this.source.subscribe(o);
+ };
- return ControlledObservable;
+ ControlledObservable.prototype.request = function (numberOfItems) {
+ return this.subject.request(numberOfItems == null ? -1 : numberOfItems);
+ };
-}(Observable));
+ return ControlledObservable;
-var ControlledSubject = (function (__super__) {
+ }(Observable));
- function subscribe (observer) {
- return this.subject.subscribe(observer);
- }
+ var ControlledSubject = (function (__super__) {
+ inherits(ControlledSubject, __super__);
+ function ControlledSubject(enableQueue, scheduler) {
+ enableQueue == null && (enableQueue = true);
- inherits(ControlledSubject, __super__);
-
- function ControlledSubject(enableQueue, scheduler) {
- enableQueue == null && (enableQueue = true);
-
- __super__.call(this, subscribe);
- this.subject = new Subject();
- this.enableQueue = enableQueue;
- this.queue = enableQueue ? [] : null;
- this.requestedCount = 0;
- this.requestedDisposable = null;
- this.error = null;
- this.hasFailed = false;
- this.hasCompleted = false;
- this.scheduler = scheduler || currentThreadScheduler;
- }
+ __super__.call(this);
+ this.subject = new Subject();
+ this.enableQueue = enableQueue;
+ this.queue = enableQueue ? [] : null;
+ this.requestedCount = 0;
+ this.requestedDisposable = null;
+ this.error = null;
+ this.hasFailed = false;
+ this.hasCompleted = false;
+ this.scheduler = scheduler || currentThreadScheduler;
+ }
- addProperties(ControlledSubject.prototype, Observer, {
- onCompleted: function () {
- this.hasCompleted = true;
- if (!this.enableQueue || this.queue.length === 0) {
- this.subject.onCompleted();
- this.disposeCurrentRequest()
- } else {
- this.queue.push(Notification.createOnCompleted());
- }
- },
- onError: function (error) {
- this.hasFailed = true;
- this.error = error;
- if (!this.enableQueue || this.queue.length === 0) {
- this.subject.onError(error);
- this.disposeCurrentRequest()
- } else {
- this.queue.push(Notification.createOnError(error));
- }
- },
- onNext: function (value) {
- if (this.requestedCount <= 0) {
- this.enableQueue && this.queue.push(Notification.createOnNext(value));
- } else {
- (this.requestedCount-- === 0) && this.disposeCurrentRequest();
- this.subject.onNext(value);
- }
- },
- _processRequest: function (numberOfItems) {
- if (this.enableQueue) {
- while (this.queue.length > 0 && (numberOfItems > 0 || this.queue[0].kind !== 'N')) {
- var first = this.queue.shift();
- first.accept(this.subject);
- if (first.kind === 'N') {
- numberOfItems--;
- } else {
- this.disposeCurrentRequest();
- this.queue = [];
+ addProperties(ControlledSubject.prototype, Observer, {
+ _subscribe: function (o) {
+ return this.subject.subscribe(o);
+ },
+ onCompleted: function () {
+ this.hasCompleted = true;
+ if (!this.enableQueue || this.queue.length === 0) {
+ this.subject.onCompleted();
+ this.disposeCurrentRequest();
+ } else {
+ this.queue.push(Notification.createOnCompleted());
+ }
+ },
+ onError: function (error) {
+ this.hasFailed = true;
+ this.error = error;
+ if (!this.enableQueue || this.queue.length === 0) {
+ this.subject.onError(error);
+ this.disposeCurrentRequest();
+ } else {
+ this.queue.push(Notification.createOnError(error));
+ }
+ },
+ onNext: function (value) {
+ if (this.requestedCount <= 0) {
+ this.enableQueue && this.queue.push(Notification.createOnNext(value));
+ } else {
+ (this.requestedCount-- === 0) && this.disposeCurrentRequest();
+ this.subject.onNext(value);
+ }
+ },
+ _processRequest: function (numberOfItems) {
+ if (this.enableQueue) {
+ while (this.queue.length > 0 && (numberOfItems > 0 || this.queue[0].kind !== 'N')) {
+ var first = this.queue.shift();
+ first.accept(this.subject);
+ if (first.kind === 'N') {
+ numberOfItems--;
+ } else {
+ this.disposeCurrentRequest();
+ this.queue = [];
+ }
}
}
- }
- return numberOfItems;
- },
- request: function (number) {
- this.disposeCurrentRequest();
- var self = this;
+ return numberOfItems;
+ },
+ request: function (number) {
+ this.disposeCurrentRequest();
+ var self = this;
+
+ this.requestedDisposable = this.scheduler.schedule(number,
+ function(s, i) {
+ var remaining = self._processRequest(i);
+ var stopped = self.hasCompleted || self.hasFailed;
+ if (!stopped && remaining > 0) {
+ self.requestedCount = remaining;
+
+ return disposableCreate(function () {
+ self.requestedCount = 0;
+ });
+ // Scheduled item is still in progress. Return a new
+ // disposable to allow the request to be interrupted
+ // via dispose.
+ }
+ });
- this.requestedDisposable = this.scheduler.scheduleWithState(number,
- function(s, i) {
- var remaining = self._processRequest(i);
- var stopped = self.hasCompleted || self.hasFailed
- if (!stopped && remaining > 0) {
- self.requestedCount = remaining;
-
- return disposableCreate(function () {
- self.requestedCount = 0;
- });
- // Scheduled item is still in progress. Return a new
- // disposable to allow the request to be interrupted
- // via dispose.
+ return this.requestedDisposable;
+ },
+ disposeCurrentRequest: function () {
+ if (this.requestedDisposable) {
+ this.requestedDisposable.dispose();
+ this.requestedDisposable = null;
}
- });
-
- return this.requestedDisposable;
- },
- disposeCurrentRequest: function () {
- if (this.requestedDisposable) {
- this.requestedDisposable.dispose();
- this.requestedDisposable = null;
}
- }
- });
+ });
- return ControlledSubject;
-}(Observable));
+ return ControlledSubject;
+ }(Observable));
-/**
- * Attaches a controller to the observable sequence with the ability to queue.
- * @example
- * var source = Rx.Observable.interval(100).controlled();
- * source.request(3); // Reads 3 values
- * @param {bool} enableQueue truthy value to determine if values should be queued pending the next request
- * @param {Scheduler} scheduler determines how the requests will be scheduled
- * @returns {Observable} The observable sequence which only propagates values on request.
- */
-observableProto.controlled = function (enableQueue, scheduler) {
+ /**
+ * Attaches a controller to the observable sequence with the ability to queue.
+ * @example
+ * var source = Rx.Observable.interval(100).controlled();
+ * source.request(3); // Reads 3 values
+ * @param {bool} enableQueue truthy value to determine if values should be queued pending the next request
+ * @param {Scheduler} scheduler determines how the requests will be scheduled
+ * @returns {Observable} The observable sequence which only propagates values on request.
+ */
+ observableProto.controlled = function (enableQueue, scheduler) {
- if (enableQueue && isScheduler(enableQueue)) {
+ if (enableQueue && isScheduler(enableQueue)) {
scheduler = enableQueue;
enableQueue = true;
- }
+ }
- if (enableQueue == null) { enableQueue = true; }
- return new ControlledObservable(this, enableQueue, scheduler);
-};
+ if (enableQueue == null) { enableQueue = true; }
+ return new ControlledObservable(this, enableQueue, scheduler);
+ };
/**
* Pipes the existing Observable sequence into a Node.js Stream.
@@ -5692,6 +6394,42 @@ observableProto.controlled = function (enableQueue, scheduler) {
return dest;
};
+ var TransduceObserver = (function (__super__) {
+ inherits(TransduceObserver, __super__);
+ function TransduceObserver(o, xform) {
+ this._o = o;
+ this._xform = xform;
+ __super__.call(this);
+ }
+
+ TransduceObserver.prototype.next = function (x) {
+ var res = tryCatch(this._xform['@@transducer/step']).call(this._xform, this._o, x);
+ if (res === errorObj) { this._o.onError(res.e); }
+ };
+
+ TransduceObserver.prototype.error = function (e) { this._o.onError(e); };
+
+ TransduceObserver.prototype.completed = function () {
+ this._xform['@@transducer/result'](this._o);
+ };
+
+ return TransduceObserver;
+ }(AbstractObserver));
+
+ function transformForObserver(o) {
+ return {
+ '@@transducer/init': function() {
+ return o;
+ },
+ '@@transducer/step': function(obs, input) {
+ return obs.onNext(input);
+ },
+ '@@transducer/result': function(obs) {
+ return obs.onCompleted();
+ }
+ };
+ }
+
/**
* Executes a transducer to transform the observable sequence
* @param {Transducer} transducer A transducer to execute
@@ -5699,31 +6437,9 @@ observableProto.controlled = function (enableQueue, scheduler) {
*/
observableProto.transduce = function(transducer) {
var source = this;
-
- function transformForObserver(o) {
- return {
- '@@transducer/init': function() {
- return o;
- },
- '@@transducer/step': function(obs, input) {
- return obs.onNext(input);
- },
- '@@transducer/result': function(obs) {
- return obs.onCompleted();
- }
- };
- }
-
return new AnonymousObservable(function(o) {
var xform = transducer(transformForObserver(o));
- return source.subscribe(
- function(v) {
- var res = tryCatch(xform['@@transducer/step']).call(xform, o, v);
- if (res === errorObj) { o.onError(res.e); }
- },
- function (e) { o.onError(e); },
- function() { xform['@@transducer/result'](o); }
- );
+ return source.subscribe(new TransduceObserver(o, xform));
}, source);
};
@@ -5739,29 +6455,26 @@ observableProto.controlled = function (enableQueue, scheduler) {
function setDisposable(s, state) {
var ado = state[0], self = state[1];
var sub = tryCatch(self.__subscribe).call(self, ado);
-
- if (sub === errorObj) {
- if(!ado.fail(errorObj.e)) { return thrower(errorObj.e); }
- }
+ if (sub === errorObj && !ado.fail(errorObj.e)) { thrower(errorObj.e); }
ado.setDisposable(fixSubscriber(sub));
}
- function innerSubscribe(observer) {
- var ado = new AutoDetachObserver(observer), state = [ado, this];
+ function AnonymousObservable(subscribe, parent) {
+ this.source = parent;
+ this.__subscribe = subscribe;
+ __super__.call(this);
+ }
+
+ AnonymousObservable.prototype._subscribe = function (o) {
+ var ado = new AutoDetachObserver(o), state = [ado, this];
if (currentThreadScheduler.scheduleRequired()) {
- currentThreadScheduler.scheduleWithState(state, setDisposable);
+ currentThreadScheduler.schedule(state, setDisposable);
} else {
setDisposable(null, state);
}
return ado;
- }
-
- function AnonymousObservable(subscribe, parent) {
- this.source = parent;
- this.__subscribe = subscribe;
- __super__.call(this, innerSubscribe);
- }
+ };
return AnonymousObservable;
@@ -5809,16 +6522,16 @@ observableProto.controlled = function (enableQueue, scheduler) {
return AutoDetachObserver;
}(AbstractObserver));
- var InnerSubscription = function (subject, observer) {
- this.subject = subject;
- this.observer = observer;
+ var InnerSubscription = function (s, o) {
+ this._s = s;
+ this._o = o;
};
InnerSubscription.prototype.dispose = function () {
- if (!this.subject.isDisposed && this.observer !== null) {
- var idx = this.subject.observers.indexOf(this.observer);
- this.subject.observers.splice(idx, 1);
- this.observer = null;
+ if (!this._s.isDisposed && this._o !== null) {
+ var idx = this._s.observers.indexOf(this._o);
+ this._s.observers.splice(idx, 1);
+ this._o = null;
}
};
@@ -5827,39 +6540,34 @@ observableProto.controlled = function (enableQueue, scheduler) {
* Each notification is broadcasted to all subscribed observers.
*/
var Subject = Rx.Subject = (function (__super__) {
- function subscribe(observer) {
- checkDisposed(this);
- if (!this.isStopped) {
- this.observers.push(observer);
- return new InnerSubscription(this, observer);
- }
- if (this.hasError) {
- observer.onError(this.error);
- return disposableEmpty;
- }
- observer.onCompleted();
- return disposableEmpty;
- }
-
inherits(Subject, __super__);
-
- /**
- * Creates a subject.
- */
function Subject() {
- __super__.call(this, subscribe);
- this.isDisposed = false,
- this.isStopped = false,
+ __super__.call(this);
+ this.isDisposed = false;
+ this.isStopped = false;
this.observers = [];
this.hasError = false;
}
addProperties(Subject.prototype, Observer.prototype, {
+ _subscribe: function (o) {
+ checkDisposed(this);
+ if (!this.isStopped) {
+ this.observers.push(o);
+ return new InnerSubscription(this, o);
+ }
+ if (this.hasError) {
+ o.onError(this.error);
+ return disposableEmpty;
+ }
+ o.onCompleted();
+ return disposableEmpty;
+ },
/**
* Indicates whether the subject has observers subscribed to it.
* @returns {Boolean} Indicates whether the subject has observers subscribed to it.
*/
- hasObservers: function () { return this.observers.length > 0; },
+ hasObservers: function () { checkDisposed(this); return this.observers.length > 0; },
/**
* Notifies all subscribed observers about the end of the sequence.
*/
@@ -5930,27 +6638,6 @@ observableProto.controlled = function (enableQueue, scheduler) {
* The last value before the OnCompleted notification, or the error received through OnError, is sent to all subscribed observers.
*/
var AsyncSubject = Rx.AsyncSubject = (function (__super__) {
-
- function subscribe(observer) {
- checkDisposed(this);
-
- if (!this.isStopped) {
- this.observers.push(observer);
- return new InnerSubscription(this, observer);
- }
-
- if (this.hasError) {
- observer.onError(this.error);
- } else if (this.hasValue) {
- observer.onNext(this.value);
- observer.onCompleted();
- } else {
- observer.onCompleted();
- }
-
- return disposableEmpty;
- }
-
inherits(AsyncSubject, __super__);
/**
@@ -5958,8 +6645,7 @@ observableProto.controlled = function (enableQueue, scheduler) {
* @constructor
*/
function AsyncSubject() {
- __super__.call(this, subscribe);
-
+ __super__.call(this);
this.isDisposed = false;
this.isStopped = false;
this.hasValue = false;
@@ -5967,15 +6653,31 @@ observableProto.controlled = function (enableQueue, scheduler) {
this.hasError = false;
}
- addProperties(AsyncSubject.prototype, Observer, {
+ addProperties(AsyncSubject.prototype, Observer.prototype, {
+ _subscribe: function (o) {
+ checkDisposed(this);
+
+ if (!this.isStopped) {
+ this.observers.push(o);
+ return new InnerSubscription(this, o);
+ }
+
+ if (this.hasError) {
+ o.onError(this.error);
+ } else if (this.hasValue) {
+ o.onNext(this.value);
+ o.onCompleted();
+ } else {
+ o.onCompleted();
+ }
+
+ return disposableEmpty;
+ },
/**
* Indicates whether the subject has observers subscribed to it.
* @returns {Boolean} Indicates whether the subject has observers subscribed to it.
*/
- hasObservers: function () {
- checkDisposed(this);
- return this.observers.length > 0;
- },
+ hasObservers: function () { checkDisposed(this); return this.observers.length > 0; },
/**
* Notifies all subscribed observers about the end of the sequence, also causing the last received value to be sent out (if any).
*/
@@ -6035,7 +6737,7 @@ observableProto.controlled = function (enableQueue, scheduler) {
dispose: function () {
this.isDisposed = true;
this.observers = null;
- this.exception = null;
+ this.error = null;
this.value = null;
}
});
@@ -6045,18 +6747,16 @@ observableProto.controlled = function (enableQueue, scheduler) {
var AnonymousSubject = Rx.AnonymousSubject = (function (__super__) {
inherits(AnonymousSubject, __super__);
-
- function subscribe(observer) {
- return this.observable.subscribe(observer);
- }
-
function AnonymousSubject(observer, observable) {
this.observer = observer;
this.observable = observable;
- __super__.call(this, subscribe);
+ __super__.call(this);
}
addProperties(AnonymousSubject.prototype, Observer.prototype, {
+ _subscribe: function (o) {
+ return this.observable.subscribe(o);
+ },
onCompleted: function () {
this.observer.onCompleted();
},
@@ -6076,37 +6776,31 @@ observableProto.controlled = function (enableQueue, scheduler) {
* Observers can subscribe to the subject to receive the last (or initial) value and all subsequent notifications.
*/
var BehaviorSubject = Rx.BehaviorSubject = (function (__super__) {
- function subscribe(observer) {
- checkDisposed(this);
- if (!this.isStopped) {
- this.observers.push(observer);
- observer.onNext(this.value);
- return new InnerSubscription(this, observer);
- }
- if (this.hasError) {
- observer.onError(this.error);
- } else {
- observer.onCompleted();
- }
- return disposableEmpty;
- }
-
inherits(BehaviorSubject, __super__);
-
- /**
- * Initializes a new instance of the BehaviorSubject class which creates a subject that caches its last value and starts with the specified value.
- * @param {Mixed} value Initial value sent to observers when no other value has been received by the subject yet.
- */
function BehaviorSubject(value) {
- __super__.call(this, subscribe);
- this.value = value,
- this.observers = [],
- this.isDisposed = false,
- this.isStopped = false,
+ __super__.call(this);
+ this.value = value;
+ this.observers = [];
+ this.isDisposed = false;
+ this.isStopped = false;
this.hasError = false;
}
- addProperties(BehaviorSubject.prototype, Observer, {
+ addProperties(BehaviorSubject.prototype, Observer.prototype, {
+ _subscribe: function (o) {
+ checkDisposed(this);
+ if (!this.isStopped) {
+ this.observers.push(o);
+ o.onNext(this.value);
+ return new InnerSubscription(this, o);
+ }
+ if (this.hasError) {
+ o.onError(this.error);
+ } else {
+ o.onCompleted();
+ }
+ return disposableEmpty;
+ },
/**
* Gets the current value or throws an exception.
* Value is frozen after onCompleted is called.
@@ -6115,17 +6809,15 @@ observableProto.controlled = function (enableQueue, scheduler) {
* @returns {Mixed} The initial value passed to the constructor until onNext is called; after which, the last value passed to onNext.
*/
getValue: function () {
- checkDisposed(this);
- if (this.hasError) {
- throw this.error;
- }
- return this.value;
+ checkDisposed(this);
+ if (this.hasError) { thrower(this.error); }
+ return this.value;
},
/**
* Indicates whether the subject has observers subscribed to it.
* @returns {Boolean} Indicates whether the subject has observers subscribed to it.
*/
- hasObservers: function () { return this.observers.length > 0; },
+ hasObservers: function () { checkDisposed(this); return this.observers.length > 0; },
/**
* Notifies all subscribed observers about the end of the sequence.
*/
@@ -6175,7 +6867,7 @@ observableProto.controlled = function (enableQueue, scheduler) {
this.isDisposed = true;
this.observers = null;
this.value = null;
- this.exception = null;
+ this.error = null;
}
});
@@ -6197,27 +6889,6 @@ observableProto.controlled = function (enableQueue, scheduler) {
});
}
- function subscribe(observer) {
- var so = new ScheduledObserver(this.scheduler, observer),
- subscription = createRemovableDisposable(this, so);
- checkDisposed(this);
- this._trim(this.scheduler.now());
- this.observers.push(so);
-
- for (var i = 0, len = this.q.length; i < len; i++) {
- so.onNext(this.q[i].value);
- }
-
- if (this.hasError) {
- so.onError(this.error);
- } else if (this.isStopped) {
- so.onCompleted();
- }
-
- so.ensureActive();
- return subscription;
- }
-
inherits(ReplaySubject, __super__);
/**
@@ -6236,17 +6907,35 @@ observableProto.controlled = function (enableQueue, scheduler) {
this.isDisposed = false;
this.hasError = false;
this.error = null;
- __super__.call(this, subscribe);
+ __super__.call(this);
}
addProperties(ReplaySubject.prototype, Observer.prototype, {
+ _subscribe: function (o) {
+ checkDisposed(this);
+ var so = new ScheduledObserver(this.scheduler, o), subscription = createRemovableDisposable(this, so);
+
+ this._trim(this.scheduler.now());
+ this.observers.push(so);
+
+ for (var i = 0, len = this.q.length; i < len; i++) {
+ so.onNext(this.q[i].value);
+ }
+
+ if (this.hasError) {
+ so.onError(this.error);
+ } else if (this.isStopped) {
+ so.onCompleted();
+ }
+
+ so.ensureActive();
+ return subscription;
+ },
/**
* Indicates whether the subject has observers subscribed to it.
* @returns {Boolean} Indicates whether the subject has observers subscribed to it.
*/
- hasObservers: function () {
- return this.observers.length > 0;
- },
+ hasObservers: function () { checkDisposed(this); return this.observers.length > 0; },
_trim: function (now) {
while (this.q.length > this.bufferSize) {
this.q.shift();
@@ -6324,7 +7013,6 @@ observableProto.controlled = function (enableQueue, scheduler) {
*/
Rx.Pauser = (function (__super__) {
inherits(Pauser, __super__);
-
function Pauser() {
__super__.call(this);
}