You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
99 lines
1.9 KiB
99 lines
1.9 KiB
var Readable = require('readable-stream/readable'); |
|
var util = require('util'); |
|
|
|
function isReadable(stream) { |
|
if (typeof stream.pipe !== 'function') { |
|
return false; |
|
} |
|
|
|
if (!stream.readable) { |
|
return false; |
|
} |
|
|
|
if (typeof stream._read !== 'function') { |
|
return false; |
|
} |
|
|
|
if (!stream._readableState) { |
|
return false; |
|
} |
|
|
|
return true; |
|
} |
|
|
|
function addStream (streams, stream) { |
|
if (!isReadable(stream)) { |
|
throw new Error('All input streams must be readable'); |
|
} |
|
|
|
var self = this; |
|
|
|
stream._buffer = []; |
|
|
|
stream.on('readable', function () { |
|
var chunk = stream.read(); |
|
while (chunk) { |
|
if (this === streams[0]) { |
|
self.push(chunk); |
|
} else { |
|
this._buffer.push(chunk); |
|
} |
|
chunk = stream.read(); |
|
} |
|
}); |
|
|
|
stream.on('end', function () { |
|
for (var stream = streams[0]; |
|
stream && stream._readableState.ended; |
|
stream = streams[0]) { |
|
while (stream._buffer.length) { |
|
self.push(stream._buffer.shift()); |
|
} |
|
|
|
streams.shift(); |
|
} |
|
|
|
if (!streams.length) { |
|
self.push(null); |
|
} |
|
}); |
|
|
|
stream.on('error', this.emit.bind(this, 'error')); |
|
|
|
streams.push(stream); |
|
} |
|
|
|
function OrderedStreams (streams, options) { |
|
if (!(this instanceof(OrderedStreams))) { |
|
return new OrderedStreams(streams, options); |
|
} |
|
|
|
streams = streams || []; |
|
options = options || {}; |
|
|
|
options.objectMode = true; |
|
|
|
Readable.call(this, options); |
|
|
|
if (!Array.isArray(streams)) { |
|
streams = [streams]; |
|
} |
|
if (!streams.length) { |
|
return this.push(null); // no streams, close |
|
} |
|
|
|
var addStreamBinded = addStream.bind(this, []); |
|
|
|
streams.forEach(function (item) { |
|
if (Array.isArray(item)) { |
|
item.forEach(addStreamBinded); |
|
} else { |
|
addStreamBinded(item); |
|
} |
|
}); |
|
} |
|
util.inherits(OrderedStreams, Readable); |
|
|
|
OrderedStreams.prototype._read = function () {}; |
|
|
|
module.exports = OrderedStreams;
|
|
|