You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
590 lines
21 KiB
590 lines
21 KiB
// Copyright (c) Microsoft, All rights reserved. See License.txt in the project root for license information. |
|
|
|
;(function (factory) { |
|
var objectTypes = { |
|
'function': true, |
|
'object': true |
|
}; |
|
|
|
function checkGlobal(value) { |
|
return (value && value.Object === Object) ? value : null; |
|
} |
|
|
|
var freeExports = (objectTypes[typeof exports] && exports && !exports.nodeType) ? exports : null; |
|
var freeModule = (objectTypes[typeof module] && module && !module.nodeType) ? module : null; |
|
var freeGlobal = checkGlobal(freeExports && freeModule && typeof global === 'object' && global); |
|
var freeSelf = checkGlobal(objectTypes[typeof self] && self); |
|
var freeWindow = checkGlobal(objectTypes[typeof window] && window); |
|
var moduleExports = (freeModule && freeModule.exports === freeExports) ? freeExports : null; |
|
var thisGlobal = checkGlobal(objectTypes[typeof this] && this); |
|
var root = freeGlobal || ((freeWindow !== (thisGlobal && thisGlobal.window)) && freeWindow) || freeSelf || thisGlobal || Function('return this')(); |
|
|
|
// Because of build optimizers |
|
if (typeof define === 'function' && define.amd) { |
|
define(['./rx'], function (Rx, exports) { |
|
return factory(root, exports, Rx); |
|
}); |
|
} else if (typeof module === 'object' && module && module.exports === freeExports) { |
|
module.exports = factory(root, module.exports, require('./rx')); |
|
} else { |
|
root.Rx = factory(root, {}, root.Rx); |
|
} |
|
}.call(this, function (root, exp, Rx, undefined) { |
|
|
|
// Aliases |
|
var Observable = Rx.Observable, |
|
observableProto = Observable.prototype, |
|
ObservableBase = Rx.ObservableBase, |
|
AbstractObserver = Rx.internals.AbstractObserver, |
|
FlatMapObservable = Rx.FlatMapObservable, |
|
observableConcat = Observable.concat, |
|
observableDefer = Observable.defer, |
|
observableEmpty = Observable.empty, |
|
disposableEmpty = Rx.Disposable.empty, |
|
CompositeDisposable = Rx.CompositeDisposable, |
|
SerialDisposable = Rx.SerialDisposable, |
|
SingleAssignmentDisposable = Rx.SingleAssignmentDisposable, |
|
Enumerable = Rx.internals.Enumerable, |
|
enumerableOf = Enumerable.of, |
|
currentThreadScheduler = Rx.Scheduler.currentThread, |
|
AsyncSubject = Rx.AsyncSubject, |
|
Observer = Rx.Observer, |
|
inherits = Rx.internals.inherits, |
|
addProperties = Rx.internals.addProperties, |
|
helpers = Rx.helpers, |
|
noop = helpers.noop, |
|
isPromise = helpers.isPromise, |
|
isFunction = helpers.isFunction, |
|
isIterable = Rx.helpers.isIterable, |
|
isArrayLike = Rx.helpers.isArrayLike, |
|
isScheduler = Rx.Scheduler.isScheduler, |
|
observableFromPromise = Observable.fromPromise; |
|
|
|
var errorObj = {e: {}}; |
|
|
|
function tryCatcherGen(tryCatchTarget) { |
|
return function tryCatcher() { |
|
try { |
|
return tryCatchTarget.apply(this, arguments); |
|
} catch (e) { |
|
errorObj.e = e; |
|
return errorObj; |
|
} |
|
}; |
|
} |
|
|
|
var tryCatch = Rx.internals.tryCatch = function tryCatch(fn) { |
|
if (!isFunction(fn)) { throw new TypeError('fn must be a function'); } |
|
return tryCatcherGen(fn); |
|
}; |
|
|
|
function thrower(e) { |
|
throw e; |
|
} |
|
|
|
// Shim in iterator support |
|
var $iterator$ = (typeof Symbol === 'function' && Symbol.iterator) || |
|
'_es6shim_iterator_'; |
|
// Bug for mozilla version |
|
if (root.Set && typeof new root.Set()['@@iterator'] === 'function') { |
|
$iterator$ = '@@iterator'; |
|
} |
|
|
|
var doneEnumerator = Rx.doneEnumerator = { done: true, value: undefined }; |
|
|
|
var isIterable = Rx.helpers.isIterable = function (o) { |
|
return o && o[$iterator$] !== undefined; |
|
}; |
|
|
|
var isArrayLike = Rx.helpers.isArrayLike = function (o) { |
|
return o && o.length !== undefined; |
|
}; |
|
|
|
Rx.helpers.iterator = $iterator$; |
|
|
|
var WhileEnumerable = (function(__super__) { |
|
inherits(WhileEnumerable, __super__); |
|
function WhileEnumerable(c, s) { |
|
this.c = c; |
|
this.s = s; |
|
} |
|
WhileEnumerable.prototype[$iterator$] = function () { |
|
var self = this; |
|
return { |
|
next: function () { |
|
return self.c() ? |
|
{ done: false, value: self.s } : |
|
{ done: true, value: void 0 }; |
|
} |
|
}; |
|
}; |
|
return WhileEnumerable; |
|
}(Enumerable)); |
|
|
|
function enumerableWhile(condition, source) { |
|
return new WhileEnumerable(condition, source); |
|
} |
|
|
|
/** |
|
* Returns an observable sequence that is the result of invoking the selector on the source sequence, without sharing subscriptions. |
|
* This operator allows for a fluent style of writing queries that use the same sequence multiple times. |
|
* |
|
* @param {Function} selector Selector function which can use the source sequence as many times as needed, without sharing subscriptions to the source sequence. |
|
* @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source sequence within a selector function. |
|
*/ |
|
observableProto.letBind = observableProto['let'] = function (func) { |
|
return func(this); |
|
}; |
|
|
|
/** |
|
* Determines whether an observable collection contains values. |
|
* |
|
* @example |
|
* 1 - res = Rx.Observable.if(condition, obs1); |
|
* 2 - res = Rx.Observable.if(condition, obs1, obs2); |
|
* 3 - res = Rx.Observable.if(condition, obs1, scheduler); |
|
* @param {Function} condition The condition which determines if the thenSource or elseSource will be run. |
|
* @param {Observable} thenSource The observable sequence or Promise that will be run if the condition function returns true. |
|
* @param {Observable} [elseSource] The observable sequence or Promise that will be run if the condition function returns false. If this is not provided, it defaults to Rx.Observabe.Empty with the specified scheduler. |
|
* @returns {Observable} An observable sequence which is either the thenSource or elseSource. |
|
*/ |
|
Observable['if'] = function (condition, thenSource, elseSourceOrScheduler) { |
|
return observableDefer(function () { |
|
elseSourceOrScheduler || (elseSourceOrScheduler = observableEmpty()); |
|
|
|
isPromise(thenSource) && (thenSource = observableFromPromise(thenSource)); |
|
isPromise(elseSourceOrScheduler) && (elseSourceOrScheduler = observableFromPromise(elseSourceOrScheduler)); |
|
|
|
// Assume a scheduler for empty only |
|
typeof elseSourceOrScheduler.now === 'function' && (elseSourceOrScheduler = observableEmpty(elseSourceOrScheduler)); |
|
return condition() ? thenSource : elseSourceOrScheduler; |
|
}); |
|
}; |
|
|
|
/** |
|
* Concatenates the observable sequences obtained by running the specified result selector for each element in source. |
|
* There is an alias for this method called 'forIn' for browsers <IE9 |
|
* @param {Array} sources An array of values to turn into an observable sequence. |
|
* @param {Function} resultSelector A function to apply to each item in the sources array to turn it into an observable sequence. |
|
* @returns {Observable} An observable sequence from the concatenated observable sequences. |
|
*/ |
|
Observable['for'] = Observable.forIn = function (sources, resultSelector, thisArg) { |
|
return enumerableOf(sources, resultSelector, thisArg).concat(); |
|
}; |
|
|
|
/** |
|
* Repeats source as long as condition holds emulating a while loop. |
|
* There is an alias for this method called 'whileDo' for browsers <IE9 |
|
* |
|
* @param {Function} condition The condition which determines if the source will be repeated. |
|
* @param {Observable} source The observable sequence that will be run if the condition function returns true. |
|
* @returns {Observable} An observable sequence which is repeated as long as the condition holds. |
|
*/ |
|
var observableWhileDo = Observable['while'] = Observable.whileDo = function (condition, source) { |
|
isPromise(source) && (source = observableFromPromise(source)); |
|
return enumerableWhile(condition, source).concat(); |
|
}; |
|
|
|
/** |
|
* Repeats source as long as condition holds emulating a do while loop. |
|
* |
|
* @param {Function} condition The condition which determines if the source will be repeated. |
|
* @param {Observable} source The observable sequence that will be run if the condition function returns true. |
|
* @returns {Observable} An observable sequence which is repeated as long as the condition holds. |
|
*/ |
|
observableProto.doWhile = function (condition) { |
|
return observableConcat([this, observableWhileDo(condition, this)]); |
|
}; |
|
|
|
/** |
|
* Uses selector to determine which source in sources to use. |
|
* @param {Function} selector The function which extracts the value for to test in a case statement. |
|
* @param {Array} sources A object which has keys which correspond to the case statement labels. |
|
* @param {Observable} [elseSource] The observable sequence or Promise that will be run if the sources are not matched. If this is not provided, it defaults to Rx.Observabe.empty with the specified scheduler. |
|
* |
|
* @returns {Observable} An observable sequence which is determined by a case statement. |
|
*/ |
|
Observable['case'] = function (selector, sources, defaultSourceOrScheduler) { |
|
return observableDefer(function () { |
|
isPromise(defaultSourceOrScheduler) && (defaultSourceOrScheduler = observableFromPromise(defaultSourceOrScheduler)); |
|
defaultSourceOrScheduler || (defaultSourceOrScheduler = observableEmpty()); |
|
|
|
isScheduler(defaultSourceOrScheduler) && (defaultSourceOrScheduler = observableEmpty(defaultSourceOrScheduler)); |
|
|
|
var result = sources[selector()]; |
|
isPromise(result) && (result = observableFromPromise(result)); |
|
|
|
return result || defaultSourceOrScheduler; |
|
}); |
|
}; |
|
|
|
var ExpandObservable = (function(__super__) { |
|
inherits(ExpandObservable, __super__); |
|
function ExpandObservable(source, fn, scheduler) { |
|
this.source = source; |
|
this._fn = fn; |
|
this._scheduler = scheduler; |
|
__super__.call(this); |
|
} |
|
|
|
function scheduleRecursive(args, recurse) { |
|
var state = args[0], self = args[1]; |
|
var work; |
|
if (state.q.length > 0) { |
|
work = state.q.shift(); |
|
} else { |
|
state.isAcquired = false; |
|
return; |
|
} |
|
var m1 = new SingleAssignmentDisposable(); |
|
state.d.add(m1); |
|
m1.setDisposable(work.subscribe(new ExpandObserver(state, self, m1))); |
|
recurse([state, self]); |
|
} |
|
|
|
ExpandObservable.prototype._ensureActive = function (state) { |
|
var isOwner = false; |
|
if (state.q.length > 0) { |
|
isOwner = !state.isAcquired; |
|
state.isAcquired = true; |
|
} |
|
isOwner && state.m.setDisposable(this._scheduler.scheduleRecursive([state, this], scheduleRecursive)); |
|
}; |
|
|
|
ExpandObservable.prototype.subscribeCore = function (o) { |
|
var m = new SerialDisposable(), |
|
d = new CompositeDisposable(m), |
|
state = { |
|
q: [], |
|
m: m, |
|
d: d, |
|
activeCount: 0, |
|
isAcquired: false, |
|
o: o |
|
}; |
|
|
|
state.q.push(this.source); |
|
state.activeCount++; |
|
this._ensureActive(state); |
|
return d; |
|
}; |
|
|
|
return ExpandObservable; |
|
}(ObservableBase)); |
|
|
|
var ExpandObserver = (function(__super__) { |
|
inherits(ExpandObserver, __super__); |
|
function ExpandObserver(state, parent, m1) { |
|
this._s = state; |
|
this._p = parent; |
|
this._m1 = m1; |
|
__super__.call(this); |
|
} |
|
|
|
ExpandObserver.prototype.next = function (x) { |
|
this._s.o.onNext(x); |
|
var result = tryCatch(this._p._fn)(x); |
|
if (result === errorObj) { return this._s.o.onError(result.e); } |
|
this._s.q.push(result); |
|
this._s.activeCount++; |
|
this._p._ensureActive(this._s); |
|
}; |
|
|
|
ExpandObserver.prototype.error = function (e) { |
|
this._s.o.onError(e); |
|
}; |
|
|
|
ExpandObserver.prototype.completed = function () { |
|
this._s.d.remove(this._m1); |
|
this._s.activeCount--; |
|
this._s.activeCount === 0 && this._s.o.onCompleted(); |
|
}; |
|
|
|
return ExpandObserver; |
|
}(AbstractObserver)); |
|
|
|
/** |
|
* Expands an observable sequence by recursively invoking selector. |
|
* |
|
* @param {Function} selector Selector function to invoke for each produced element, resulting in another sequence to which the selector will be invoked recursively again. |
|
* @param {Scheduler} [scheduler] Scheduler on which to perform the expansion. If not provided, this defaults to the current thread scheduler. |
|
* @returns {Observable} An observable sequence containing all the elements produced by the recursive expansion. |
|
*/ |
|
observableProto.expand = function (selector, scheduler) { |
|
isScheduler(scheduler) || (scheduler = currentThreadScheduler); |
|
return new ExpandObservable(this, selector, scheduler); |
|
}; |
|
|
|
function argumentsToArray() { |
|
var len = arguments.length, args = new Array(len); |
|
for(var i = 0; i < len; i++) { args[i] = arguments[i]; } |
|
return args; |
|
} |
|
|
|
var ForkJoinObservable = (function (__super__) { |
|
inherits(ForkJoinObservable, __super__); |
|
function ForkJoinObservable(sources, cb) { |
|
this._sources = sources; |
|
this._cb = cb; |
|
__super__.call(this); |
|
} |
|
|
|
ForkJoinObservable.prototype.subscribeCore = function (o) { |
|
if (this._sources.length === 0) { |
|
o.onCompleted(); |
|
return disposableEmpty; |
|
} |
|
|
|
var count = this._sources.length; |
|
var state = { |
|
finished: false, |
|
hasResults: new Array(count), |
|
hasCompleted: new Array(count), |
|
results: new Array(count) |
|
}; |
|
|
|
var subscriptions = new CompositeDisposable(); |
|
for (var i = 0, len = this._sources.length; i < len; i++) { |
|
var source = this._sources[i]; |
|
isPromise(source) && (source = observableFromPromise(source)); |
|
subscriptions.add(source.subscribe(new ForkJoinObserver(o, state, i, this._cb, subscriptions))); |
|
} |
|
|
|
return subscriptions; |
|
}; |
|
|
|
return ForkJoinObservable; |
|
}(ObservableBase)); |
|
|
|
var ForkJoinObserver = (function(__super__) { |
|
inherits(ForkJoinObserver, __super__); |
|
function ForkJoinObserver(o, s, i, cb, subs) { |
|
this._o = o; |
|
this._s = s; |
|
this._i = i; |
|
this._cb = cb; |
|
this._subs = subs; |
|
__super__.call(this); |
|
} |
|
|
|
ForkJoinObserver.prototype.next = function (x) { |
|
if (!this._s.finished) { |
|
this._s.hasResults[this._i] = true; |
|
this._s.results[this._i] = x; |
|
} |
|
}; |
|
|
|
ForkJoinObserver.prototype.error = function (e) { |
|
this._s.finished = true; |
|
this._o.onError(e); |
|
this._subs.dispose(); |
|
}; |
|
|
|
ForkJoinObserver.prototype.completed = function () { |
|
if (!this._s.finished) { |
|
if (!this._s.hasResults[this._i]) { |
|
return this._o.onCompleted(); |
|
} |
|
this._s.hasCompleted[this._i] = true; |
|
for (var i = 0; i < this._s.results.length; i++) { |
|
if (!this._s.hasCompleted[i]) { return; } |
|
} |
|
this._s.finished = true; |
|
|
|
var res = tryCatch(this._cb).apply(null, this._s.results); |
|
if (res === errorObj) { return this._o.onError(res.e); } |
|
|
|
this._o.onNext(res); |
|
this._o.onCompleted(); |
|
} |
|
}; |
|
|
|
return ForkJoinObserver; |
|
}(AbstractObserver)); |
|
|
|
/** |
|
* Runs all observable sequences in parallel and collect their last elements. |
|
* |
|
* @example |
|
* 1 - res = Rx.Observable.forkJoin([obs1, obs2]); |
|
* 1 - res = Rx.Observable.forkJoin(obs1, obs2, ...); |
|
* @returns {Observable} An observable sequence with an array collecting the last elements of all the input sequences. |
|
*/ |
|
Observable.forkJoin = function () { |
|
var len = arguments.length, args = new Array(len); |
|
for(var i = 0; i < len; i++) { args[i] = arguments[i]; } |
|
var resultSelector = isFunction(args[len - 1]) ? args.pop() : argumentsToArray; |
|
Array.isArray(args[0]) && (args = args[0]); |
|
return new ForkJoinObservable(args, resultSelector); |
|
}; |
|
|
|
/** |
|
* Runs two observable sequences in parallel and combines their last elemenets. |
|
* @param {Observable} second Second observable sequence. |
|
* @param {Function} resultSelector Result selector function to invoke with the last elements of both sequences. |
|
* @returns {Observable} An observable sequence with the result of calling the selector function with the last elements of both input sequences. |
|
*/ |
|
observableProto.forkJoin = function () { |
|
var len = arguments.length, args = new Array(len); |
|
for(var i = 0; i < len; i++) { args[i] = arguments[i]; } |
|
if (Array.isArray(args[0])) { |
|
args[0].unshift(this); |
|
} else { |
|
args.unshift(this); |
|
} |
|
return Observable.forkJoin.apply(null, args); |
|
}; |
|
|
|
/** |
|
* Comonadic bind operator. |
|
* @param {Function} selector A transform function to apply to each element. |
|
* @param {Object} scheduler Scheduler used to execute the operation. If not specified, defaults to the ImmediateScheduler. |
|
* @returns {Observable} An observable sequence which results from the comonadic bind operation. |
|
*/ |
|
observableProto.manySelect = observableProto.extend = function (selector, scheduler) { |
|
isScheduler(scheduler) || (scheduler = Rx.Scheduler.immediate); |
|
var source = this; |
|
return observableDefer(function () { |
|
var chain; |
|
|
|
return source |
|
.map(function (x) { |
|
var curr = new ChainObservable(x); |
|
|
|
chain && chain.onNext(x); |
|
chain = curr; |
|
|
|
return curr; |
|
}) |
|
.tap( |
|
noop, |
|
function (e) { chain && chain.onError(e); }, |
|
function () { chain && chain.onCompleted(); } |
|
) |
|
.observeOn(scheduler) |
|
.map(selector); |
|
}, source); |
|
}; |
|
|
|
var ChainObservable = (function (__super__) { |
|
inherits(ChainObservable, __super__); |
|
function ChainObservable(head) { |
|
__super__.call(this); |
|
this.head = head; |
|
this.tail = new AsyncSubject(); |
|
} |
|
|
|
addProperties(ChainObservable.prototype, Observer, { |
|
_subscribe: function (o) { |
|
var g = new CompositeDisposable(); |
|
g.add(currentThreadScheduler.schedule(this, function (_, self) { |
|
o.onNext(self.head); |
|
g.add(self.tail.mergeAll().subscribe(o)); |
|
})); |
|
|
|
return g; |
|
}, |
|
onCompleted: function () { |
|
this.onNext(Observable.empty()); |
|
}, |
|
onError: function (e) { |
|
this.onNext(Observable['throw'](e)); |
|
}, |
|
onNext: function (v) { |
|
this.tail.onNext(v); |
|
this.tail.onCompleted(); |
|
} |
|
}); |
|
|
|
return ChainObservable; |
|
|
|
}(Observable)); |
|
|
|
var SwitchFirstObservable = (function (__super__) { |
|
inherits(SwitchFirstObservable, __super__); |
|
function SwitchFirstObservable(source) { |
|
this.source = source; |
|
__super__.call(this); |
|
} |
|
|
|
SwitchFirstObservable.prototype.subscribeCore = function (o) { |
|
var m = new SingleAssignmentDisposable(), |
|
g = new CompositeDisposable(), |
|
state = { |
|
hasCurrent: false, |
|
isStopped: false, |
|
o: o, |
|
g: g |
|
}; |
|
|
|
g.add(m); |
|
m.setDisposable(this.source.subscribe(new SwitchFirstObserver(state))); |
|
return g; |
|
}; |
|
|
|
return SwitchFirstObservable; |
|
}(ObservableBase)); |
|
|
|
var SwitchFirstObserver = (function(__super__) { |
|
inherits(SwitchFirstObserver, __super__); |
|
function SwitchFirstObserver(state) { |
|
this._s = state; |
|
__super__.call(this); |
|
} |
|
|
|
SwitchFirstObserver.prototype.next = function (x) { |
|
if (!this._s.hasCurrent) { |
|
this._s.hasCurrent = true; |
|
isPromise(x) && (x = observableFromPromise(x)); |
|
var inner = new SingleAssignmentDisposable(); |
|
this._s.g.add(inner); |
|
inner.setDisposable(x.subscribe(new InnerObserver(this._s, inner))); |
|
} |
|
}; |
|
|
|
SwitchFirstObserver.prototype.error = function (e) { |
|
this._s.o.onError(e); |
|
}; |
|
|
|
SwitchFirstObserver.prototype.completed = function () { |
|
this._s.isStopped = true; |
|
!this._s.hasCurrent && this._s.g.length === 1 && this._s.o.onCompleted(); |
|
}; |
|
|
|
inherits(InnerObserver, __super__); |
|
function InnerObserver(state, inner) { |
|
this._s = state; |
|
this._i = inner; |
|
__super__.call(this); |
|
} |
|
|
|
InnerObserver.prototype.next = function (x) { this._s.o.onNext(x); }; |
|
InnerObserver.prototype.error = function (e) { this._s.o.onError(e); }; |
|
InnerObserver.prototype.completed = function () { |
|
this._s.g.remove(this._i); |
|
this._s.hasCurrent = false; |
|
this._s.isStopped && this._s.g.length === 1 && this._s.o.onCompleted(); |
|
}; |
|
|
|
return SwitchFirstObserver; |
|
}(AbstractObserver)); |
|
|
|
/** |
|
* Performs a exclusive waiting for the first to finish before subscribing to another observable. |
|
* Observables that come in between subscriptions will be dropped on the floor. |
|
* @returns {Observable} A exclusive observable with only the results that happen when subscribed. |
|
*/ |
|
observableProto.switchFirst = function () { |
|
return new SwitchFirstObservable(this); |
|
}; |
|
|
|
observableProto.flatMapFirst = observableProto.exhaustMap = function(selector, resultSelector, thisArg) { |
|
return new FlatMapObservable(this, selector, resultSelector, thisArg).switchFirst(); |
|
}; |
|
|
|
observableProto.flatMapWithMaxConcurrent = observableProto.flatMapMaxConcurrent = function(limit, selector, resultSelector, thisArg) { |
|
return new FlatMapObservable(this, selector, resultSelector, thisArg).merge(limit); |
|
}; |
|
|
|
return Rx; |
|
}));
|
|
|