You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
153 lines
3.2 KiB
153 lines
3.2 KiB
'use strict' |
|
|
|
var PassThrough = require('readable-stream').PassThrough |
|
var inherits = require('inherits') |
|
var p = require('process-nextick-args') |
|
|
|
function Cloneable (stream, opts) { |
|
if (!(this instanceof Cloneable)) { |
|
return new Cloneable(stream, opts) |
|
} |
|
|
|
var objectMode = stream._readableState.objectMode |
|
this._original = stream |
|
this._clonesCount = 1 |
|
|
|
opts = opts || {} |
|
opts.objectMode = objectMode |
|
|
|
PassThrough.call(this, opts) |
|
|
|
forwardDestroy(stream, this) |
|
|
|
this.on('newListener', onData) |
|
this.once('resume', onResume) |
|
|
|
this._hasListener = true |
|
} |
|
|
|
inherits(Cloneable, PassThrough) |
|
|
|
function onData (event, listener) { |
|
if (event === 'data' || event === 'readable') { |
|
this._hasListener = false |
|
this.removeListener('newListener', onData) |
|
this.removeListener('resume', onResume) |
|
p.nextTick(clonePiped, this) |
|
} |
|
} |
|
|
|
function onResume () { |
|
this._hasListener = false |
|
this.removeListener('newListener', onData) |
|
p.nextTick(clonePiped, this) |
|
} |
|
|
|
Cloneable.prototype.clone = function () { |
|
if (!this._original) { |
|
throw new Error('already started') |
|
} |
|
|
|
this._clonesCount++ |
|
|
|
// the events added by the clone should not count |
|
// for starting the flow |
|
this.removeListener('newListener', onData) |
|
var clone = new Clone(this) |
|
if (this._hasListener) { |
|
this.on('newListener', onData) |
|
} |
|
|
|
return clone |
|
} |
|
|
|
Cloneable.prototype._destroy = function (err, cb) { |
|
if (!err) { |
|
this.push(null) |
|
this.end() |
|
this.emit('close') |
|
} |
|
|
|
p.nextTick(cb, err) |
|
} |
|
|
|
function forwardDestroy (src, dest) { |
|
src.on('error', destroy) |
|
src.on('close', onClose) |
|
|
|
function destroy (err) { |
|
src.removeListener('close', onClose) |
|
dest.destroy(err) |
|
} |
|
|
|
function onClose () { |
|
dest.end() |
|
} |
|
} |
|
|
|
function clonePiped (that) { |
|
if (--that._clonesCount === 0 && !that._readableState.destroyed) { |
|
that._original.pipe(that) |
|
that._original = undefined |
|
} |
|
} |
|
|
|
function Clone (parent, opts) { |
|
if (!(this instanceof Clone)) { |
|
return new Clone(parent, opts) |
|
} |
|
|
|
var objectMode = parent._readableState.objectMode |
|
|
|
opts = opts || {} |
|
opts.objectMode = objectMode |
|
|
|
this.parent = parent |
|
|
|
PassThrough.call(this, opts) |
|
|
|
forwardDestroy(parent, this) |
|
|
|
parent.pipe(this) |
|
|
|
// the events added by the clone should not count |
|
// for starting the flow |
|
// so we add the newListener handle after we are done |
|
this.on('newListener', onDataClone) |
|
this.on('resume', onResumeClone) |
|
} |
|
|
|
function onDataClone (event, listener) { |
|
// We start the flow once all clones are piped or destroyed |
|
if (event === 'data' || event === 'readable' || event === 'close') { |
|
p.nextTick(clonePiped, this.parent) |
|
this.removeListener('newListener', onDataClone) |
|
} |
|
} |
|
|
|
function onResumeClone () { |
|
this.removeListener('newListener', onDataClone) |
|
p.nextTick(clonePiped, this.parent) |
|
} |
|
|
|
inherits(Clone, PassThrough) |
|
|
|
Clone.prototype.clone = function () { |
|
return this.parent.clone() |
|
} |
|
|
|
Cloneable.isCloneable = function (stream) { |
|
return stream instanceof Cloneable || stream instanceof Clone |
|
} |
|
|
|
Clone.prototype._destroy = function (err, cb) { |
|
if (!err) { |
|
this.push(null) |
|
this.end() |
|
this.emit('close') |
|
} |
|
|
|
p.nextTick(cb, err) |
|
} |
|
|
|
module.exports = Cloneable
|
|
|