You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
616 lines
19 KiB
616 lines
19 KiB
// Copyright (c) Microsoft, All rights reserved. See License.txt in the project root for license information. |
|
|
|
;(function (factory) { |
|
var objectTypes = { |
|
'function': true, |
|
'object': true |
|
}; |
|
|
|
function checkGlobal(value) { |
|
return (value && value.Object === Object) ? value : null; |
|
} |
|
|
|
var freeExports = (objectTypes[typeof exports] && exports && !exports.nodeType) ? exports : null; |
|
var freeModule = (objectTypes[typeof module] && module && !module.nodeType) ? module : null; |
|
var freeGlobal = checkGlobal(freeExports && freeModule && typeof global === 'object' && global); |
|
var freeSelf = checkGlobal(objectTypes[typeof self] && self); |
|
var freeWindow = checkGlobal(objectTypes[typeof window] && window); |
|
var moduleExports = (freeModule && freeModule.exports === freeExports) ? freeExports : null; |
|
var thisGlobal = checkGlobal(objectTypes[typeof this] && this); |
|
var root = freeGlobal || ((freeWindow !== (thisGlobal && thisGlobal.window)) && freeWindow) || freeSelf || thisGlobal || Function('return this')(); |
|
|
|
// Because of build optimizers |
|
if (typeof define === 'function' && define.amd) { |
|
define(['./rx'], function (Rx, exports) { |
|
return factory(root, exports, Rx); |
|
}); |
|
} else if (typeof module === 'object' && module && module.exports === freeExports) { |
|
module.exports = factory(root, module.exports, require('./rx')); |
|
} else { |
|
root.Rx = factory(root, {}, root.Rx); |
|
} |
|
}.call(this, function (root, exp, Rx, undefined) { |
|
|
|
// References |
|
var Observable = Rx.Observable, |
|
observableProto = Observable.prototype, |
|
AnonymousObservable = Rx.AnonymousObservable, |
|
AbstractObserver = Rx.internals.AbstractObserver, |
|
CompositeDisposable = Rx.CompositeDisposable, |
|
BinaryDisposable = Rx.BinaryDisposable, |
|
NAryDisposable = Rx.NAryDisposable, |
|
Notification = Rx.Notification, |
|
Subject = Rx.Subject, |
|
Observer = Rx.Observer, |
|
disposableEmpty = Rx.Disposable.empty, |
|
disposableCreate = Rx.Disposable.create, |
|
inherits = Rx.internals.inherits, |
|
addProperties = Rx.internals.addProperties, |
|
defaultScheduler = Rx.Scheduler['default'], |
|
currentThreadScheduler = Rx.Scheduler.currentThread, |
|
identity = Rx.helpers.identity, |
|
isScheduler = Rx.Scheduler.isScheduler, |
|
isFunction = Rx.helpers.isFunction, |
|
checkDisposed = Rx.Disposable.checkDisposed; |
|
|
|
var errorObj = {e: {}}; |
|
|
|
function tryCatcherGen(tryCatchTarget) { |
|
return function tryCatcher() { |
|
try { |
|
return tryCatchTarget.apply(this, arguments); |
|
} catch (e) { |
|
errorObj.e = e; |
|
return errorObj; |
|
} |
|
}; |
|
} |
|
|
|
var tryCatch = Rx.internals.tryCatch = function tryCatch(fn) { |
|
if (!isFunction(fn)) { throw new TypeError('fn must be a function'); } |
|
return tryCatcherGen(fn); |
|
}; |
|
|
|
function thrower(e) { |
|
throw e; |
|
} |
|
|
|
/** |
|
* Used to pause and resume streams. |
|
*/ |
|
Rx.Pauser = (function (__super__) { |
|
inherits(Pauser, __super__); |
|
function Pauser() { |
|
__super__.call(this); |
|
} |
|
|
|
/** |
|
* Pauses the underlying sequence. |
|
*/ |
|
Pauser.prototype.pause = function () { this.onNext(false); }; |
|
|
|
/** |
|
* Resumes the underlying sequence. |
|
*/ |
|
Pauser.prototype.resume = function () { this.onNext(true); }; |
|
|
|
return Pauser; |
|
}(Subject)); |
|
|
|
var PausableObservable = (function (__super__) { |
|
inherits(PausableObservable, __super__); |
|
function PausableObservable(source, pauser) { |
|
this.source = source; |
|
this.controller = new Subject(); |
|
this.paused = true; |
|
|
|
if (pauser && pauser.subscribe) { |
|
this.pauser = this.controller.merge(pauser); |
|
} else { |
|
this.pauser = this.controller; |
|
} |
|
|
|
__super__.call(this); |
|
} |
|
|
|
PausableObservable.prototype._subscribe = function (o) { |
|
var conn = this.source.publish(), |
|
subscription = conn.subscribe(o), |
|
connection = disposableEmpty; |
|
|
|
var pausable = this.pauser.startWith(!this.paused).distinctUntilChanged().subscribe(function (b) { |
|
if (b) { |
|
connection = conn.connect(); |
|
} else { |
|
connection.dispose(); |
|
connection = disposableEmpty; |
|
} |
|
}); |
|
|
|
return new NAryDisposable([subscription, connection, pausable]); |
|
}; |
|
|
|
PausableObservable.prototype.pause = function () { |
|
this.paused = true; |
|
this.controller.onNext(false); |
|
}; |
|
|
|
PausableObservable.prototype.resume = function () { |
|
this.paused = false; |
|
this.controller.onNext(true); |
|
}; |
|
|
|
return PausableObservable; |
|
|
|
}(Observable)); |
|
|
|
/** |
|
* Pauses the underlying observable sequence based upon the observable sequence which yields true/false. |
|
* @example |
|
* var pauser = new Rx.Subject(); |
|
* var source = Rx.Observable.interval(100).pausable(pauser); |
|
* @param {Observable} pauser The observable sequence used to pause the underlying sequence. |
|
* @returns {Observable} The observable sequence which is paused based upon the pauser. |
|
*/ |
|
observableProto.pausable = function (pauser) { |
|
return new PausableObservable(this, pauser); |
|
}; |
|
|
|
function combineLatestSource(source, subject, resultSelector) { |
|
return new AnonymousObservable(function (o) { |
|
var hasValue = [false, false], |
|
hasValueAll = false, |
|
isDone = false, |
|
values = new Array(2), |
|
err; |
|
|
|
function next(x, i) { |
|
values[i] = x; |
|
hasValue[i] = true; |
|
if (hasValueAll || (hasValueAll = hasValue.every(identity))) { |
|
if (err) { return o.onError(err); } |
|
var res = tryCatch(resultSelector).apply(null, values); |
|
if (res === errorObj) { return o.onError(res.e); } |
|
o.onNext(res); |
|
} |
|
isDone && values[1] && o.onCompleted(); |
|
} |
|
|
|
return new BinaryDisposable( |
|
source.subscribe( |
|
function (x) { |
|
next(x, 0); |
|
}, |
|
function (e) { |
|
if (values[1]) { |
|
o.onError(e); |
|
} else { |
|
err = e; |
|
} |
|
}, |
|
function () { |
|
isDone = true; |
|
values[1] && o.onCompleted(); |
|
}), |
|
subject.subscribe( |
|
function (x) { |
|
next(x, 1); |
|
}, |
|
function (e) { o.onError(e); }, |
|
function () { |
|
isDone = true; |
|
next(true, 1); |
|
}) |
|
); |
|
}, source); |
|
} |
|
|
|
var PausableBufferedObservable = (function (__super__) { |
|
inherits(PausableBufferedObservable, __super__); |
|
function PausableBufferedObservable(source, pauser) { |
|
this.source = source; |
|
this.controller = new Subject(); |
|
this.paused = true; |
|
|
|
if (pauser && pauser.subscribe) { |
|
this.pauser = this.controller.merge(pauser); |
|
} else { |
|
this.pauser = this.controller; |
|
} |
|
|
|
__super__.call(this); |
|
} |
|
|
|
PausableBufferedObservable.prototype._subscribe = function (o) { |
|
var q = [], previousShouldFire; |
|
|
|
function drainQueue() { while (q.length > 0) { o.onNext(q.shift()); } } |
|
|
|
var subscription = |
|
combineLatestSource( |
|
this.source, |
|
this.pauser.startWith(!this.paused).distinctUntilChanged(), |
|
function (data, shouldFire) { |
|
return { data: data, shouldFire: shouldFire }; |
|
}) |
|
.subscribe( |
|
function (results) { |
|
if (previousShouldFire !== undefined && results.shouldFire !== previousShouldFire) { |
|
previousShouldFire = results.shouldFire; |
|
// change in shouldFire |
|
if (results.shouldFire) { drainQueue(); } |
|
} else { |
|
previousShouldFire = results.shouldFire; |
|
// new data |
|
if (results.shouldFire) { |
|
o.onNext(results.data); |
|
} else { |
|
q.push(results.data); |
|
} |
|
} |
|
}, |
|
function (err) { |
|
drainQueue(); |
|
o.onError(err); |
|
}, |
|
function () { |
|
drainQueue(); |
|
o.onCompleted(); |
|
} |
|
); |
|
return subscription; |
|
}; |
|
|
|
PausableBufferedObservable.prototype.pause = function () { |
|
this.paused = true; |
|
this.controller.onNext(false); |
|
}; |
|
|
|
PausableBufferedObservable.prototype.resume = function () { |
|
this.paused = false; |
|
this.controller.onNext(true); |
|
}; |
|
|
|
return PausableBufferedObservable; |
|
|
|
}(Observable)); |
|
|
|
/** |
|
* Pauses the underlying observable sequence based upon the observable sequence which yields true/false, |
|
* and yields the values that were buffered while paused. |
|
* @example |
|
* var pauser = new Rx.Subject(); |
|
* var source = Rx.Observable.interval(100).pausableBuffered(pauser); |
|
* @param {Observable} pauser The observable sequence used to pause the underlying sequence. |
|
* @returns {Observable} The observable sequence which is paused based upon the pauser. |
|
*/ |
|
observableProto.pausableBuffered = function (pauser) { |
|
return new PausableBufferedObservable(this, pauser); |
|
}; |
|
|
|
var ControlledObservable = (function (__super__) { |
|
inherits(ControlledObservable, __super__); |
|
function ControlledObservable (source, enableQueue, scheduler) { |
|
__super__.call(this); |
|
this.subject = new ControlledSubject(enableQueue, scheduler); |
|
this.source = source.multicast(this.subject).refCount(); |
|
} |
|
|
|
ControlledObservable.prototype._subscribe = function (o) { |
|
return this.source.subscribe(o); |
|
}; |
|
|
|
ControlledObservable.prototype.request = function (numberOfItems) { |
|
return this.subject.request(numberOfItems == null ? -1 : numberOfItems); |
|
}; |
|
|
|
return ControlledObservable; |
|
|
|
}(Observable)); |
|
|
|
var ControlledSubject = (function (__super__) { |
|
inherits(ControlledSubject, __super__); |
|
function ControlledSubject(enableQueue, scheduler) { |
|
enableQueue == null && (enableQueue = true); |
|
|
|
__super__.call(this); |
|
this.subject = new Subject(); |
|
this.enableQueue = enableQueue; |
|
this.queue = enableQueue ? [] : null; |
|
this.requestedCount = 0; |
|
this.requestedDisposable = null; |
|
this.error = null; |
|
this.hasFailed = false; |
|
this.hasCompleted = false; |
|
this.scheduler = scheduler || currentThreadScheduler; |
|
} |
|
|
|
addProperties(ControlledSubject.prototype, Observer, { |
|
_subscribe: function (o) { |
|
return this.subject.subscribe(o); |
|
}, |
|
onCompleted: function () { |
|
this.hasCompleted = true; |
|
if (!this.enableQueue || this.queue.length === 0) { |
|
this.subject.onCompleted(); |
|
this.disposeCurrentRequest(); |
|
} else { |
|
this.queue.push(Notification.createOnCompleted()); |
|
} |
|
}, |
|
onError: function (error) { |
|
this.hasFailed = true; |
|
this.error = error; |
|
if (!this.enableQueue || this.queue.length === 0) { |
|
this.subject.onError(error); |
|
this.disposeCurrentRequest(); |
|
} else { |
|
this.queue.push(Notification.createOnError(error)); |
|
} |
|
}, |
|
onNext: function (value) { |
|
if (this.requestedCount <= 0) { |
|
this.enableQueue && this.queue.push(Notification.createOnNext(value)); |
|
} else { |
|
(this.requestedCount-- === 0) && this.disposeCurrentRequest(); |
|
this.subject.onNext(value); |
|
} |
|
}, |
|
_processRequest: function (numberOfItems) { |
|
if (this.enableQueue) { |
|
while (this.queue.length > 0 && (numberOfItems > 0 || this.queue[0].kind !== 'N')) { |
|
var first = this.queue.shift(); |
|
first.accept(this.subject); |
|
if (first.kind === 'N') { |
|
numberOfItems--; |
|
} else { |
|
this.disposeCurrentRequest(); |
|
this.queue = []; |
|
} |
|
} |
|
} |
|
|
|
return numberOfItems; |
|
}, |
|
request: function (number) { |
|
this.disposeCurrentRequest(); |
|
var self = this; |
|
|
|
this.requestedDisposable = this.scheduler.schedule(number, |
|
function(s, i) { |
|
var remaining = self._processRequest(i); |
|
var stopped = self.hasCompleted || self.hasFailed; |
|
if (!stopped && remaining > 0) { |
|
self.requestedCount = remaining; |
|
|
|
return disposableCreate(function () { |
|
self.requestedCount = 0; |
|
}); |
|
// Scheduled item is still in progress. Return a new |
|
// disposable to allow the request to be interrupted |
|
// via dispose. |
|
} |
|
}); |
|
|
|
return this.requestedDisposable; |
|
}, |
|
disposeCurrentRequest: function () { |
|
if (this.requestedDisposable) { |
|
this.requestedDisposable.dispose(); |
|
this.requestedDisposable = null; |
|
} |
|
} |
|
}); |
|
|
|
return ControlledSubject; |
|
}(Observable)); |
|
|
|
/** |
|
* Attaches a controller to the observable sequence with the ability to queue. |
|
* @example |
|
* var source = Rx.Observable.interval(100).controlled(); |
|
* source.request(3); // Reads 3 values |
|
* @param {bool} enableQueue truthy value to determine if values should be queued pending the next request |
|
* @param {Scheduler} scheduler determines how the requests will be scheduled |
|
* @returns {Observable} The observable sequence which only propagates values on request. |
|
*/ |
|
observableProto.controlled = function (enableQueue, scheduler) { |
|
|
|
if (enableQueue && isScheduler(enableQueue)) { |
|
scheduler = enableQueue; |
|
enableQueue = true; |
|
} |
|
|
|
if (enableQueue == null) { enableQueue = true; } |
|
return new ControlledObservable(this, enableQueue, scheduler); |
|
}; |
|
|
|
var StopAndWaitObservable = (function (__super__) { |
|
inherits(StopAndWaitObservable, __super__); |
|
function StopAndWaitObservable (source) { |
|
__super__.call(this); |
|
this.source = source; |
|
} |
|
|
|
function scheduleMethod(s, self) { |
|
return self.source.request(1); |
|
} |
|
|
|
StopAndWaitObservable.prototype._subscribe = function (o) { |
|
this.subscription = this.source.subscribe(new StopAndWaitObserver(o, this, this.subscription)); |
|
return new BinaryDisposable( |
|
this.subscription, |
|
defaultScheduler.schedule(this, scheduleMethod) |
|
); |
|
}; |
|
|
|
var StopAndWaitObserver = (function (__sub__) { |
|
inherits(StopAndWaitObserver, __sub__); |
|
function StopAndWaitObserver (observer, observable, cancel) { |
|
__sub__.call(this); |
|
this.observer = observer; |
|
this.observable = observable; |
|
this.cancel = cancel; |
|
this.scheduleDisposable = null; |
|
} |
|
|
|
StopAndWaitObserver.prototype.completed = function () { |
|
this.observer.onCompleted(); |
|
this.dispose(); |
|
}; |
|
|
|
StopAndWaitObserver.prototype.error = function (error) { |
|
this.observer.onError(error); |
|
this.dispose(); |
|
}; |
|
|
|
function innerScheduleMethod(s, self) { |
|
return self.observable.source.request(1); |
|
} |
|
|
|
StopAndWaitObserver.prototype.next = function (value) { |
|
this.observer.onNext(value); |
|
this.scheduleDisposable = defaultScheduler.schedule(this, innerScheduleMethod); |
|
}; |
|
|
|
StopAndWaitObserver.dispose = function () { |
|
this.observer = null; |
|
if (this.cancel) { |
|
this.cancel.dispose(); |
|
this.cancel = null; |
|
} |
|
if (this.scheduleDisposable) { |
|
this.scheduleDisposable.dispose(); |
|
this.scheduleDisposable = null; |
|
} |
|
__sub__.prototype.dispose.call(this); |
|
}; |
|
|
|
return StopAndWaitObserver; |
|
}(AbstractObserver)); |
|
|
|
return StopAndWaitObservable; |
|
}(Observable)); |
|
|
|
|
|
/** |
|
* Attaches a stop and wait observable to the current observable. |
|
* @returns {Observable} A stop and wait observable. |
|
*/ |
|
ControlledObservable.prototype.stopAndWait = function () { |
|
return new StopAndWaitObservable(this); |
|
}; |
|
|
|
var WindowedObservable = (function (__super__) { |
|
inherits(WindowedObservable, __super__); |
|
function WindowedObservable(source, windowSize) { |
|
__super__.call(this); |
|
this.source = source; |
|
this.windowSize = windowSize; |
|
} |
|
|
|
function scheduleMethod(s, self) { |
|
return self.source.request(self.windowSize); |
|
} |
|
|
|
WindowedObservable.prototype._subscribe = function (o) { |
|
this.subscription = this.source.subscribe(new WindowedObserver(o, this, this.subscription)); |
|
return new BinaryDisposable( |
|
this.subscription, |
|
defaultScheduler.schedule(this, scheduleMethod) |
|
); |
|
}; |
|
|
|
var WindowedObserver = (function (__sub__) { |
|
inherits(WindowedObserver, __sub__); |
|
function WindowedObserver(observer, observable, cancel) { |
|
this.observer = observer; |
|
this.observable = observable; |
|
this.cancel = cancel; |
|
this.received = 0; |
|
this.scheduleDisposable = null; |
|
__sub__.call(this); |
|
} |
|
|
|
WindowedObserver.prototype.completed = function () { |
|
this.observer.onCompleted(); |
|
this.dispose(); |
|
}; |
|
|
|
WindowedObserver.prototype.error = function (error) { |
|
this.observer.onError(error); |
|
this.dispose(); |
|
}; |
|
|
|
function innerScheduleMethod(s, self) { |
|
return self.observable.source.request(self.observable.windowSize); |
|
} |
|
|
|
WindowedObserver.prototype.next = function (value) { |
|
this.observer.onNext(value); |
|
this.received = ++this.received % this.observable.windowSize; |
|
this.received === 0 && (this.scheduleDisposable = defaultScheduler.schedule(this, innerScheduleMethod)); |
|
}; |
|
|
|
WindowedObserver.prototype.dispose = function () { |
|
this.observer = null; |
|
if (this.cancel) { |
|
this.cancel.dispose(); |
|
this.cancel = null; |
|
} |
|
if (this.scheduleDisposable) { |
|
this.scheduleDisposable.dispose(); |
|
this.scheduleDisposable = null; |
|
} |
|
__sub__.prototype.dispose.call(this); |
|
}; |
|
|
|
return WindowedObserver; |
|
}(AbstractObserver)); |
|
|
|
return WindowedObservable; |
|
}(Observable)); |
|
|
|
/** |
|
* Creates a sliding windowed observable based upon the window size. |
|
* @param {Number} windowSize The number of items in the window |
|
* @returns {Observable} A windowed observable based upon the window size. |
|
*/ |
|
ControlledObservable.prototype.windowed = function (windowSize) { |
|
return new WindowedObservable(this, windowSize); |
|
}; |
|
|
|
/** |
|
* Pipes the existing Observable sequence into a Node.js Stream. |
|
* @param {Stream} dest The destination Node.js stream. |
|
* @returns {Stream} The destination stream. |
|
*/ |
|
observableProto.pipe = function (dest) { |
|
var source = this.pausableBuffered(); |
|
|
|
function onDrain() { |
|
source.resume(); |
|
} |
|
|
|
dest.addListener('drain', onDrain); |
|
|
|
source.subscribe( |
|
function (x) { |
|
!dest.write(x) && source.pause(); |
|
}, |
|
function (err) { |
|
dest.emit('error', err); |
|
}, |
|
function () { |
|
// Hack check because STDIO is not closable |
|
!dest._isStdio && dest.end(); |
|
dest.removeListener('drain', onDrain); |
|
}); |
|
|
|
source.resume(); |
|
|
|
return dest; |
|
}; |
|
|
|
return Rx; |
|
}));
|
|
|