You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
806 lines
29 KiB
806 lines
29 KiB
// Copyright (c) Microsoft, All rights reserved. See License.txt in the project root for license information. |
|
|
|
;(function (factory) { |
|
var objectTypes = { |
|
'function': true, |
|
'object': true |
|
}; |
|
|
|
function checkGlobal(value) { |
|
return (value && value.Object === Object) ? value : null; |
|
} |
|
|
|
var freeExports = (objectTypes[typeof exports] && exports && !exports.nodeType) ? exports : null; |
|
var freeModule = (objectTypes[typeof module] && module && !module.nodeType) ? module : null; |
|
var freeGlobal = checkGlobal(freeExports && freeModule && typeof global === 'object' && global); |
|
var freeSelf = checkGlobal(objectTypes[typeof self] && self); |
|
var freeWindow = checkGlobal(objectTypes[typeof window] && window); |
|
var moduleExports = (freeModule && freeModule.exports === freeExports) ? freeExports : null; |
|
var thisGlobal = checkGlobal(objectTypes[typeof this] && this); |
|
var root = freeGlobal || ((freeWindow !== (thisGlobal && thisGlobal.window)) && freeWindow) || freeSelf || thisGlobal || Function('return this')(); |
|
|
|
// Because of build optimizers |
|
if (typeof define === 'function' && define.amd) { |
|
define(['./rx.lite'], function (Rx, exports) { |
|
return factory(root, exports, Rx); |
|
}); |
|
} else if (typeof module === 'object' && module && module.exports === freeExports) { |
|
module.exports = factory(root, module.exports, require('rx-lite')); |
|
} else { |
|
root.Rx = factory(root, {}, root.Rx); |
|
} |
|
}.call(this, function (root, exp, Rx, undefined) { |
|
|
|
// References |
|
var Observable = Rx.Observable, |
|
observableProto = Observable.prototype, |
|
observableNever = Observable.never, |
|
observableThrow = Observable['throw'], |
|
AnonymousObservable = Rx.AnonymousObservable, |
|
ObservableBase = Rx.ObservableBase, |
|
AnonymousObserver = Rx.AnonymousObserver, |
|
notificationCreateOnNext = Rx.Notification.createOnNext, |
|
notificationCreateOnError = Rx.Notification.createOnError, |
|
notificationCreateOnCompleted = Rx.Notification.createOnCompleted, |
|
Observer = Rx.Observer, |
|
observerCreate = Observer.create, |
|
AbstractObserver = Rx.internals.AbstractObserver, |
|
Subject = Rx.Subject, |
|
internals = Rx.internals, |
|
helpers = Rx.helpers, |
|
ScheduledObserver = internals.ScheduledObserver, |
|
SerialDisposable = Rx.SerialDisposable, |
|
SingleAssignmentDisposable = Rx.SingleAssignmentDisposable, |
|
CompositeDisposable = Rx.CompositeDisposable, |
|
BinaryDisposable = Rx.BinaryDisposable, |
|
RefCountDisposable = Rx.RefCountDisposable, |
|
disposableEmpty = Rx.Disposable.empty, |
|
immediateScheduler = Rx.Scheduler.immediate, |
|
defaultKeySerializer = helpers.defaultKeySerializer, |
|
addRef = Rx.internals.addRef, |
|
identity = helpers.identity, |
|
isPromise = helpers.isPromise, |
|
isFunction = helpers.isFunction, |
|
inherits = internals.inherits, |
|
bindCallback = internals.bindCallback, |
|
noop = helpers.noop, |
|
isScheduler = Rx.Scheduler.isScheduler, |
|
observableFromPromise = Observable.fromPromise, |
|
ArgumentOutOfRangeError = Rx.ArgumentOutOfRangeError; |
|
|
|
var errorObj = {e: {}}; |
|
|
|
function tryCatcherGen(tryCatchTarget) { |
|
return function tryCatcher() { |
|
try { |
|
return tryCatchTarget.apply(this, arguments); |
|
} catch (e) { |
|
errorObj.e = e; |
|
return errorObj; |
|
} |
|
}; |
|
} |
|
|
|
var tryCatch = Rx.internals.tryCatch = function tryCatch(fn) { |
|
if (!isFunction(fn)) { throw new TypeError('fn must be a function'); } |
|
return tryCatcherGen(fn); |
|
}; |
|
|
|
function thrower(e) { |
|
throw e; |
|
} |
|
|
|
function ScheduledDisposable(scheduler, disposable) { |
|
this.scheduler = scheduler; |
|
this.disposable = disposable; |
|
this.isDisposed = false; |
|
} |
|
|
|
function scheduleItem(s, self) { |
|
if (!self.isDisposed) { |
|
self.isDisposed = true; |
|
self.disposable.dispose(); |
|
} |
|
} |
|
|
|
ScheduledDisposable.prototype.dispose = function () { |
|
this.scheduler.schedule(this, scheduleItem); |
|
}; |
|
|
|
var CheckedObserver = (function (__super__) { |
|
inherits(CheckedObserver, __super__); |
|
|
|
function CheckedObserver(observer) { |
|
__super__.call(this); |
|
this._observer = observer; |
|
this._state = 0; // 0 - idle, 1 - busy, 2 - done |
|
} |
|
|
|
var CheckedObserverPrototype = CheckedObserver.prototype; |
|
|
|
CheckedObserverPrototype.onNext = function (value) { |
|
this.checkAccess(); |
|
var res = tryCatch(this._observer.onNext).call(this._observer, value); |
|
this._state = 0; |
|
res === errorObj && thrower(res.e); |
|
}; |
|
|
|
CheckedObserverPrototype.onError = function (err) { |
|
this.checkAccess(); |
|
var res = tryCatch(this._observer.onError).call(this._observer, err); |
|
this._state = 2; |
|
res === errorObj && thrower(res.e); |
|
}; |
|
|
|
CheckedObserverPrototype.onCompleted = function () { |
|
this.checkAccess(); |
|
var res = tryCatch(this._observer.onCompleted).call(this._observer); |
|
this._state = 2; |
|
res === errorObj && thrower(res.e); |
|
}; |
|
|
|
CheckedObserverPrototype.checkAccess = function () { |
|
if (this._state === 1) { throw new Error('Re-entrancy detected'); } |
|
if (this._state === 2) { throw new Error('Observer completed'); } |
|
if (this._state === 0) { this._state = 1; } |
|
}; |
|
|
|
return CheckedObserver; |
|
}(Observer)); |
|
|
|
var ObserveOnObserver = (function (__super__) { |
|
inherits(ObserveOnObserver, __super__); |
|
|
|
function ObserveOnObserver(scheduler, observer, cancel) { |
|
__super__.call(this, scheduler, observer); |
|
this._cancel = cancel; |
|
} |
|
|
|
ObserveOnObserver.prototype.next = function (value) { |
|
__super__.prototype.next.call(this, value); |
|
this.ensureActive(); |
|
}; |
|
|
|
ObserveOnObserver.prototype.error = function (e) { |
|
__super__.prototype.error.call(this, e); |
|
this.ensureActive(); |
|
}; |
|
|
|
ObserveOnObserver.prototype.completed = function () { |
|
__super__.prototype.completed.call(this); |
|
this.ensureActive(); |
|
}; |
|
|
|
ObserveOnObserver.prototype.dispose = function () { |
|
__super__.prototype.dispose.call(this); |
|
this._cancel && this._cancel.dispose(); |
|
this._cancel = null; |
|
}; |
|
|
|
return ObserveOnObserver; |
|
})(ScheduledObserver); |
|
|
|
/** |
|
* Checks access to the observer for grammar violations. This includes checking for multiple OnError or OnCompleted calls, as well as reentrancy in any of the observer methods. |
|
* If a violation is detected, an Error is thrown from the offending observer method call. |
|
* |
|
* @returns An observer that checks callbacks invocations against the observer grammar and, if the checks pass, forwards those to the specified observer. |
|
*/ |
|
Observer.prototype.checked = function () { return new CheckedObserver(this); }; |
|
|
|
/** |
|
* Schedules the invocation of observer methods on the given scheduler. |
|
* @param {Scheduler} scheduler Scheduler to schedule observer messages on. |
|
* @returns {Observer} Observer whose messages are scheduled on the given scheduler. |
|
*/ |
|
Observer.notifyOn = function (scheduler) { |
|
return new ObserveOnObserver(scheduler, this); |
|
}; |
|
|
|
/** |
|
* Creates an observer from a notification callback. |
|
* @param {Function} handler Action that handles a notification. |
|
* @returns The observer object that invokes the specified handler using a notification corresponding to each message it receives. |
|
*/ |
|
Observer.fromNotifier = function (handler, thisArg) { |
|
var handlerFunc = bindCallback(handler, thisArg, 1); |
|
return new AnonymousObserver(function (x) { |
|
return handlerFunc(notificationCreateOnNext(x)); |
|
}, function (e) { |
|
return handlerFunc(notificationCreateOnError(e)); |
|
}, function () { |
|
return handlerFunc(notificationCreateOnCompleted()); |
|
}); |
|
}; |
|
|
|
/** |
|
* Creates a notification callback from an observer. |
|
* @returns The action that forwards its input notification to the underlying observer. |
|
*/ |
|
Observer.prototype.toNotifier = function () { |
|
var observer = this; |
|
return function (n) { return n.accept(observer); }; |
|
}; |
|
|
|
/** |
|
* Hides the identity of an observer. |
|
* @returns An observer that hides the identity of the specified observer. |
|
*/ |
|
Observer.prototype.asObserver = function () { |
|
var source = this; |
|
return new AnonymousObserver( |
|
function (x) { source.onNext(x); }, |
|
function (e) { source.onError(e); }, |
|
function () { source.onCompleted(); } |
|
); |
|
}; |
|
|
|
var ObserveOnObservable = (function (__super__) { |
|
inherits(ObserveOnObservable, __super__); |
|
function ObserveOnObservable(source, s) { |
|
this.source = source; |
|
this._s = s; |
|
__super__.call(this); |
|
} |
|
|
|
ObserveOnObservable.prototype.subscribeCore = function (o) { |
|
return this.source.subscribe(new ObserveOnObserver(this._s, o)); |
|
}; |
|
|
|
return ObserveOnObservable; |
|
}(ObservableBase)); |
|
|
|
/** |
|
* Wraps the source sequence in order to run its observer callbacks on the specified scheduler. |
|
* |
|
* This only invokes observer callbacks on a scheduler. In case the subscription and/or unsubscription actions have side-effects |
|
* that require to be run on a scheduler, use subscribeOn. |
|
* |
|
* @param {Scheduler} scheduler Scheduler to notify observers on. |
|
* @returns {Observable} The source sequence whose observations happen on the specified scheduler. |
|
*/ |
|
observableProto.observeOn = function (scheduler) { |
|
return new ObserveOnObservable(this, scheduler); |
|
}; |
|
|
|
var SubscribeOnObservable = (function (__super__) { |
|
inherits(SubscribeOnObservable, __super__); |
|
function SubscribeOnObservable(source, s) { |
|
this.source = source; |
|
this._s = s; |
|
__super__.call(this); |
|
} |
|
|
|
function scheduleMethod(scheduler, state) { |
|
var source = state[0], d = state[1], o = state[2]; |
|
d.setDisposable(new ScheduledDisposable(scheduler, source.subscribe(o))); |
|
} |
|
|
|
SubscribeOnObservable.prototype.subscribeCore = function (o) { |
|
var m = new SingleAssignmentDisposable(), d = new SerialDisposable(); |
|
d.setDisposable(m); |
|
m.setDisposable(this._s.schedule([this.source, d, o], scheduleMethod)); |
|
return d; |
|
}; |
|
|
|
return SubscribeOnObservable; |
|
}(ObservableBase)); |
|
|
|
/** |
|
* Wraps the source sequence in order to run its subscription and unsubscription logic on the specified scheduler. This operation is not commonly used; |
|
* see the remarks section for more information on the distinction between subscribeOn and observeOn. |
|
|
|
* This only performs the side-effects of subscription and unsubscription on the specified scheduler. In order to invoke observer |
|
* callbacks on a scheduler, use observeOn. |
|
|
|
* @param {Scheduler} scheduler Scheduler to perform subscription and unsubscription actions on. |
|
* @returns {Observable} The source sequence whose subscriptions and unsubscriptions happen on the specified scheduler. |
|
*/ |
|
observableProto.subscribeOn = function (scheduler) { |
|
return new SubscribeOnObservable(this, scheduler); |
|
}; |
|
|
|
var GenerateObservable = (function (__super__) { |
|
inherits(GenerateObservable, __super__); |
|
function GenerateObservable(state, cndFn, itrFn, resFn, s) { |
|
this._initialState = state; |
|
this._cndFn = cndFn; |
|
this._itrFn = itrFn; |
|
this._resFn = resFn; |
|
this._s = s; |
|
__super__.call(this); |
|
} |
|
|
|
function scheduleRecursive(state, recurse) { |
|
if (state.first) { |
|
state.first = false; |
|
} else { |
|
state.newState = tryCatch(state.self._itrFn)(state.newState); |
|
if (state.newState === errorObj) { return state.o.onError(state.newState.e); } |
|
} |
|
var hasResult = tryCatch(state.self._cndFn)(state.newState); |
|
if (hasResult === errorObj) { return state.o.onError(hasResult.e); } |
|
if (hasResult) { |
|
var result = tryCatch(state.self._resFn)(state.newState); |
|
if (result === errorObj) { return state.o.onError(result.e); } |
|
state.o.onNext(result); |
|
recurse(state); |
|
} else { |
|
state.o.onCompleted(); |
|
} |
|
} |
|
|
|
GenerateObservable.prototype.subscribeCore = function (o) { |
|
var state = { |
|
o: o, |
|
self: this, |
|
first: true, |
|
newState: this._initialState |
|
}; |
|
return this._s.scheduleRecursive(state, scheduleRecursive); |
|
}; |
|
|
|
return GenerateObservable; |
|
}(ObservableBase)); |
|
|
|
/** |
|
* Generates an observable sequence by running a state-driven loop producing the sequence's elements, using the specified scheduler to send out observer messages. |
|
* |
|
* @example |
|
* var res = Rx.Observable.generate(0, function (x) { return x < 10; }, function (x) { return x + 1; }, function (x) { return x; }); |
|
* var res = Rx.Observable.generate(0, function (x) { return x < 10; }, function (x) { return x + 1; }, function (x) { return x; }, Rx.Scheduler.timeout); |
|
* @param {Mixed} initialState Initial state. |
|
* @param {Function} condition Condition to terminate generation (upon returning false). |
|
* @param {Function} iterate Iteration step function. |
|
* @param {Function} resultSelector Selector function for results produced in the sequence. |
|
* @param {Scheduler} [scheduler] Scheduler on which to run the generator loop. If not provided, defaults to Scheduler.currentThread. |
|
* @returns {Observable} The generated sequence. |
|
*/ |
|
Observable.generate = function (initialState, condition, iterate, resultSelector, scheduler) { |
|
isScheduler(scheduler) || (scheduler = currentThreadScheduler); |
|
return new GenerateObservable(initialState, condition, iterate, resultSelector, scheduler); |
|
}; |
|
|
|
var UsingObservable = (function (__super__) { |
|
inherits(UsingObservable, __super__); |
|
function UsingObservable(resFn, obsFn) { |
|
this._resFn = resFn; |
|
this._obsFn = obsFn; |
|
__super__.call(this); |
|
} |
|
|
|
UsingObservable.prototype.subscribeCore = function (o) { |
|
var disposable = disposableEmpty; |
|
var resource = tryCatch(this._resFn)(); |
|
if (resource === errorObj) { |
|
return new BinaryDisposable(observableThrow(resource.e).subscribe(o), disposable); |
|
} |
|
resource && (disposable = resource); |
|
var source = tryCatch(this._obsFn)(resource); |
|
if (source === errorObj) { |
|
return new BinaryDisposable(observableThrow(source.e).subscribe(o), disposable); |
|
} |
|
return new BinaryDisposable(source.subscribe(o), disposable); |
|
}; |
|
|
|
return UsingObservable; |
|
}(ObservableBase)); |
|
|
|
/** |
|
* Constructs an observable sequence that depends on a resource object, whose lifetime is tied to the resulting observable sequence's lifetime. |
|
* @param {Function} resourceFactory Factory function to obtain a resource object. |
|
* @param {Function} observableFactory Factory function to obtain an observable sequence that depends on the obtained resource. |
|
* @returns {Observable} An observable sequence whose lifetime controls the lifetime of the dependent resource object. |
|
*/ |
|
Observable.using = function (resourceFactory, observableFactory) { |
|
return new UsingObservable(resourceFactory, observableFactory); |
|
}; |
|
|
|
/** |
|
* Propagates the observable sequence or Promise that reacts first. |
|
* @param {Observable} rightSource Second observable sequence or Promise. |
|
* @returns {Observable} {Observable} An observable sequence that surfaces either of the given sequences, whichever reacted first. |
|
*/ |
|
observableProto.amb = function (rightSource) { |
|
var leftSource = this; |
|
return new AnonymousObservable(function (observer) { |
|
var choice, |
|
leftChoice = 'L', rightChoice = 'R', |
|
leftSubscription = new SingleAssignmentDisposable(), |
|
rightSubscription = new SingleAssignmentDisposable(); |
|
|
|
isPromise(rightSource) && (rightSource = observableFromPromise(rightSource)); |
|
|
|
function choiceL() { |
|
if (!choice) { |
|
choice = leftChoice; |
|
rightSubscription.dispose(); |
|
} |
|
} |
|
|
|
function choiceR() { |
|
if (!choice) { |
|
choice = rightChoice; |
|
leftSubscription.dispose(); |
|
} |
|
} |
|
|
|
var leftSubscribe = observerCreate( |
|
function (left) { |
|
choiceL(); |
|
choice === leftChoice && observer.onNext(left); |
|
}, |
|
function (e) { |
|
choiceL(); |
|
choice === leftChoice && observer.onError(e); |
|
}, |
|
function () { |
|
choiceL(); |
|
choice === leftChoice && observer.onCompleted(); |
|
} |
|
); |
|
var rightSubscribe = observerCreate( |
|
function (right) { |
|
choiceR(); |
|
choice === rightChoice && observer.onNext(right); |
|
}, |
|
function (e) { |
|
choiceR(); |
|
choice === rightChoice && observer.onError(e); |
|
}, |
|
function () { |
|
choiceR(); |
|
choice === rightChoice && observer.onCompleted(); |
|
} |
|
); |
|
|
|
leftSubscription.setDisposable(leftSource.subscribe(leftSubscribe)); |
|
rightSubscription.setDisposable(rightSource.subscribe(rightSubscribe)); |
|
|
|
return new BinaryDisposable(leftSubscription, rightSubscription); |
|
}); |
|
}; |
|
|
|
function amb(p, c) { return p.amb(c); } |
|
|
|
/** |
|
* Propagates the observable sequence or Promise that reacts first. |
|
* @returns {Observable} An observable sequence that surfaces any of the given sequences, whichever reacted first. |
|
*/ |
|
Observable.amb = function () { |
|
var acc = observableNever(), items; |
|
if (Array.isArray(arguments[0])) { |
|
items = arguments[0]; |
|
} else { |
|
var len = arguments.length; |
|
items = new Array(items); |
|
for(var i = 0; i < len; i++) { items[i] = arguments[i]; } |
|
} |
|
for (var i = 0, len = items.length; i < len; i++) { |
|
acc = amb(acc, items[i]); |
|
} |
|
return acc; |
|
}; |
|
|
|
/** |
|
* Continues an observable sequence that is terminated normally or by an exception with the next observable sequence. |
|
* @param {Observable} second Second observable sequence used to produce results after the first sequence terminates. |
|
* @returns {Observable} An observable sequence that concatenates the first and second sequence, even if the first sequence terminates exceptionally. |
|
*/ |
|
observableProto.onErrorResumeNext = function (second) { |
|
if (!second) { throw new Error('Second observable is required'); } |
|
return onErrorResumeNext([this, second]); |
|
}; |
|
|
|
var OnErrorResumeNextObservable = (function(__super__) { |
|
inherits(OnErrorResumeNextObservable, __super__); |
|
function OnErrorResumeNextObservable(sources) { |
|
this.sources = sources; |
|
__super__.call(this); |
|
} |
|
|
|
function scheduleMethod(state, recurse) { |
|
if (state.pos < state.sources.length) { |
|
var current = state.sources[state.pos++]; |
|
isPromise(current) && (current = observableFromPromise(current)); |
|
var d = new SingleAssignmentDisposable(); |
|
state.subscription.setDisposable(d); |
|
d.setDisposable(current.subscribe(new OnErrorResumeNextObserver(state, recurse))); |
|
} else { |
|
state.o.onCompleted(); |
|
} |
|
} |
|
|
|
OnErrorResumeNextObservable.prototype.subscribeCore = function (o) { |
|
var subscription = new SerialDisposable(), |
|
state = {pos: 0, subscription: subscription, o: o, sources: this.sources }, |
|
cancellable = immediateScheduler.scheduleRecursive(state, scheduleMethod); |
|
|
|
return new BinaryDisposable(subscription, cancellable); |
|
}; |
|
|
|
return OnErrorResumeNextObservable; |
|
}(ObservableBase)); |
|
|
|
var OnErrorResumeNextObserver = (function(__super__) { |
|
inherits(OnErrorResumeNextObserver, __super__); |
|
function OnErrorResumeNextObserver(state, recurse) { |
|
this._state = state; |
|
this._recurse = recurse; |
|
__super__.call(this); |
|
} |
|
|
|
OnErrorResumeNextObserver.prototype.next = function (x) { this._state.o.onNext(x); }; |
|
OnErrorResumeNextObserver.prototype.error = function () { this._recurse(this._state); }; |
|
OnErrorResumeNextObserver.prototype.completed = function () { this._recurse(this._state); }; |
|
|
|
return OnErrorResumeNextObserver; |
|
}(AbstractObserver)); |
|
|
|
/** |
|
* Continues an observable sequence that is terminated normally or by an exception with the next observable sequence. |
|
* @returns {Observable} An observable sequence that concatenates the source sequences, even if a sequence terminates exceptionally. |
|
*/ |
|
var onErrorResumeNext = Observable.onErrorResumeNext = function () { |
|
var sources = []; |
|
if (Array.isArray(arguments[0])) { |
|
sources = arguments[0]; |
|
} else { |
|
var len = arguments.length; |
|
sources = new Array(len); |
|
for(var i = 0; i < len; i++) { sources[i] = arguments[i]; } |
|
} |
|
return new OnErrorResumeNextObservable(sources); |
|
}; |
|
|
|
function toArray(x) { return x.toArray(); } |
|
function notEmpty(x) { return x.length > 0; } |
|
|
|
/** |
|
* Projects each element of an observable sequence into zero or more buffers which are produced based on element count information. |
|
* @param {Number} count Length of each buffer. |
|
* @param {Number} [skip] Number of elements to skip between creation of consecutive buffers. If not provided, defaults to the count. |
|
* @returns {Observable} An observable sequence of buffers. |
|
*/ |
|
observableProto.bufferWithCount = observableProto.bufferCount = function (count, skip) { |
|
typeof skip !== 'number' && (skip = count); |
|
return this.windowWithCount(count, skip) |
|
.flatMap(toArray) |
|
.filter(notEmpty); |
|
}; |
|
|
|
/** |
|
* Projects each element of an observable sequence into zero or more windows which are produced based on element count information. |
|
* @param {Number} count Length of each window. |
|
* @param {Number} [skip] Number of elements to skip between creation of consecutive windows. If not specified, defaults to the count. |
|
* @returns {Observable} An observable sequence of windows. |
|
*/ |
|
observableProto.windowWithCount = observableProto.windowCount = function (count, skip) { |
|
var source = this; |
|
+count || (count = 0); |
|
Math.abs(count) === Infinity && (count = 0); |
|
if (count <= 0) { throw new ArgumentOutOfRangeError(); } |
|
skip == null && (skip = count); |
|
+skip || (skip = 0); |
|
Math.abs(skip) === Infinity && (skip = 0); |
|
|
|
if (skip <= 0) { throw new ArgumentOutOfRangeError(); } |
|
return new AnonymousObservable(function (observer) { |
|
var m = new SingleAssignmentDisposable(), |
|
refCountDisposable = new RefCountDisposable(m), |
|
n = 0, |
|
q = []; |
|
|
|
function createWindow () { |
|
var s = new Subject(); |
|
q.push(s); |
|
observer.onNext(addRef(s, refCountDisposable)); |
|
} |
|
|
|
createWindow(); |
|
|
|
m.setDisposable(source.subscribe( |
|
function (x) { |
|
for (var i = 0, len = q.length; i < len; i++) { q[i].onNext(x); } |
|
var c = n - count + 1; |
|
c >= 0 && c % skip === 0 && q.shift().onCompleted(); |
|
++n % skip === 0 && createWindow(); |
|
}, |
|
function (e) { |
|
while (q.length > 0) { q.shift().onError(e); } |
|
observer.onError(e); |
|
}, |
|
function () { |
|
while (q.length > 0) { q.shift().onCompleted(); } |
|
observer.onCompleted(); |
|
} |
|
)); |
|
return refCountDisposable; |
|
}, source); |
|
}; |
|
|
|
var TakeLastBufferObserver = (function (__super__) { |
|
inherits(TakeLastBufferObserver, __super__); |
|
function TakeLastBufferObserver(o, c) { |
|
this._o = o; |
|
this._c = c; |
|
this._q = []; |
|
__super__.call(this); |
|
} |
|
|
|
TakeLastBufferObserver.prototype.next = function (x) { |
|
this._q.push(x); |
|
this._q.length > this._c && this._q.shift(); |
|
}; |
|
|
|
TakeLastBufferObserver.prototype.error = function (e) { |
|
this._o.onError(e); |
|
}; |
|
|
|
TakeLastBufferObserver.prototype.completed = function () { |
|
this._o.onNext(this._q); |
|
this._o.onCompleted(); |
|
}; |
|
|
|
return TakeLastBufferObserver; |
|
}(AbstractObserver)); |
|
|
|
/** |
|
* Returns an array with the specified number of contiguous elements from the end of an observable sequence. |
|
* |
|
* @description |
|
* This operator accumulates a buffer with a length enough to store count elements. Upon completion of the |
|
* source sequence, this buffer is produced on the result sequence. |
|
* @param {Number} count Number of elements to take from the end of the source sequence. |
|
* @returns {Observable} An observable sequence containing a single array with the specified number of elements from the end of the source sequence. |
|
*/ |
|
observableProto.takeLastBuffer = function (count) { |
|
if (count < 0) { throw new ArgumentOutOfRangeError(); } |
|
var source = this; |
|
return new AnonymousObservable(function (o) { |
|
return source.subscribe(new TakeLastBufferObserver(o, count)); |
|
}, source); |
|
}; |
|
|
|
var DefaultIfEmptyObserver = (function (__super__) { |
|
inherits(DefaultIfEmptyObserver, __super__); |
|
function DefaultIfEmptyObserver(o, d) { |
|
this._o = o; |
|
this._d = d; |
|
this._f = false; |
|
__super__.call(this); |
|
} |
|
|
|
DefaultIfEmptyObserver.prototype.next = function (x) { |
|
this._f = true; |
|
this._o.onNext(x); |
|
}; |
|
|
|
DefaultIfEmptyObserver.prototype.error = function (e) { |
|
this._o.onError(e); |
|
}; |
|
|
|
DefaultIfEmptyObserver.prototype.completed = function () { |
|
!this._f && this._o.onNext(this._d); |
|
this._o.onCompleted(); |
|
}; |
|
|
|
return DefaultIfEmptyObserver; |
|
}(AbstractObserver)); |
|
|
|
/** |
|
* Returns the elements of the specified sequence or the specified value in a singleton sequence if the sequence is empty. |
|
* |
|
* var res = obs = xs.defaultIfEmpty(); |
|
* 2 - obs = xs.defaultIfEmpty(false); |
|
* |
|
* @memberOf Observable# |
|
* @param defaultValue The value to return if the sequence is empty. If not provided, this defaults to null. |
|
* @returns {Observable} An observable sequence that contains the specified default value if the source is empty; otherwise, the elements of the source itself. |
|
*/ |
|
observableProto.defaultIfEmpty = function (defaultValue) { |
|
var source = this; |
|
defaultValue === undefined && (defaultValue = null); |
|
return new AnonymousObservable(function (o) { |
|
return source.subscribe(new DefaultIfEmptyObserver(o, defaultValue)); |
|
}, source); |
|
}; |
|
|
|
// Swap out for Array.findIndex |
|
function arrayIndexOfComparer(array, item, comparer) { |
|
for (var i = 0, len = array.length; i < len; i++) { |
|
if (comparer(array[i], item)) { return i; } |
|
} |
|
return -1; |
|
} |
|
|
|
function HashSet(comparer) { |
|
this.comparer = comparer; |
|
this.set = []; |
|
} |
|
HashSet.prototype.push = function(value) { |
|
var retValue = arrayIndexOfComparer(this.set, value, this.comparer) === -1; |
|
retValue && this.set.push(value); |
|
return retValue; |
|
}; |
|
|
|
var DistinctObservable = (function (__super__) { |
|
inherits(DistinctObservable, __super__); |
|
function DistinctObservable(source, keyFn, cmpFn) { |
|
this.source = source; |
|
this._keyFn = keyFn; |
|
this._cmpFn = cmpFn; |
|
__super__.call(this); |
|
} |
|
|
|
DistinctObservable.prototype.subscribeCore = function (o) { |
|
return this.source.subscribe(new DistinctObserver(o, this._keyFn, this._cmpFn)); |
|
}; |
|
|
|
return DistinctObservable; |
|
}(ObservableBase)); |
|
|
|
var DistinctObserver = (function (__super__) { |
|
inherits(DistinctObserver, __super__); |
|
function DistinctObserver(o, keyFn, cmpFn) { |
|
this._o = o; |
|
this._keyFn = keyFn; |
|
this._h = new HashSet(cmpFn); |
|
__super__.call(this); |
|
} |
|
|
|
DistinctObserver.prototype.next = function (x) { |
|
var key = x; |
|
if (isFunction(this._keyFn)) { |
|
key = tryCatch(this._keyFn)(x); |
|
if (key === errorObj) { return this._o.onError(key.e); } |
|
} |
|
this._h.push(key) && this._o.onNext(x); |
|
}; |
|
|
|
DistinctObserver.prototype.error = function (e) { this._o.onError(e); }; |
|
DistinctObserver.prototype.completed = function () { this._o.onCompleted(); }; |
|
|
|
return DistinctObserver; |
|
}(AbstractObserver)); |
|
|
|
/** |
|
* Returns an observable sequence that contains only distinct elements according to the keySelector and the comparer. |
|
* Usage of this operator should be considered carefully due to the maintenance of an internal lookup structure which can grow large. |
|
* |
|
* @example |
|
* var res = obs = xs.distinct(); |
|
* 2 - obs = xs.distinct(function (x) { return x.id; }); |
|
* 2 - obs = xs.distinct(function (x) { return x.id; }, function (a,b) { return a === b; }); |
|
* @param {Function} [keySelector] A function to compute the comparison key for each element. |
|
* @param {Function} [comparer] Used to compare items in the collection. |
|
* @returns {Observable} An observable sequence only containing the distinct elements, based on a computed key value, from the source sequence. |
|
*/ |
|
observableProto.distinct = function (keySelector, comparer) { |
|
comparer || (comparer = defaultComparer); |
|
return new DistinctObservable(this, keySelector, comparer); |
|
}; |
|
|
|
/** |
|
* Returns an observable sequence that shares a single subscription to the underlying sequence. This observable sequence |
|
* can be resubscribed to, even if all prior subscriptions have ended. (unlike `.publish().refCount()`) |
|
* @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source. |
|
*/ |
|
observableProto.singleInstance = function() { |
|
var source = this, hasObservable = false, observable; |
|
|
|
function getObservable() { |
|
if (!hasObservable) { |
|
hasObservable = true; |
|
observable = source['finally'](function() { hasObservable = false; }).publish().refCount(); |
|
} |
|
return observable; |
|
} |
|
|
|
return new AnonymousObservable(function(o) { |
|
return getObservable().subscribe(o); |
|
}); |
|
}; |
|
|
|
return Rx; |
|
}));
|
|
|