You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
146 lines
3.5 KiB
146 lines
3.5 KiB
2 years ago
|
//filter will reemit the data if cb(err,pass) pass is truthy
|
||
|
|
||
|
// reduce is more tricky
|
||
|
// maybe we want to group the reductions or emit progress updates occasionally
|
||
|
// the most basic reduce just emits one 'data' event after it has recieved 'end'
|
||
|
|
||
|
|
||
|
var Stream = require('stream').Stream
|
||
|
|
||
|
|
||
|
//create an event stream and apply function to each .write
|
||
|
//emitting each response as data
|
||
|
//unless it's an empty callback
|
||
|
|
||
|
module.exports = function (mapper, opts) {
|
||
|
|
||
|
var stream = new Stream()
|
||
|
, self = this
|
||
|
, inputs = 0
|
||
|
, outputs = 0
|
||
|
, ended = false
|
||
|
, paused = false
|
||
|
, destroyed = false
|
||
|
, lastWritten = 0
|
||
|
, inNext = false
|
||
|
|
||
|
this.opts = opts || {};
|
||
|
var errorEventName = this.opts.failures ? 'failure' : 'error';
|
||
|
|
||
|
// Items that are not ready to be written yet (because they would come out of
|
||
|
// order) get stuck in a queue for later.
|
||
|
var writeQueue = {}
|
||
|
|
||
|
stream.writable = true
|
||
|
stream.readable = true
|
||
|
|
||
|
function queueData (data, number) {
|
||
|
var nextToWrite = lastWritten + 1
|
||
|
|
||
|
if (number === nextToWrite) {
|
||
|
// If it's next, and its not undefined write it
|
||
|
if (data !== undefined) {
|
||
|
stream.emit.apply(stream, ['data', data])
|
||
|
}
|
||
|
lastWritten ++
|
||
|
nextToWrite ++
|
||
|
} else {
|
||
|
// Otherwise queue it for later.
|
||
|
writeQueue[number] = data
|
||
|
}
|
||
|
|
||
|
// If the next value is in the queue, write it
|
||
|
if (writeQueue.hasOwnProperty(nextToWrite)) {
|
||
|
var dataToWrite = writeQueue[nextToWrite]
|
||
|
delete writeQueue[nextToWrite]
|
||
|
return queueData(dataToWrite, nextToWrite)
|
||
|
}
|
||
|
|
||
|
outputs ++
|
||
|
if(inputs === outputs) {
|
||
|
if(paused) paused = false, stream.emit('drain') //written all the incoming events
|
||
|
if(ended) end()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
function next (err, data, number) {
|
||
|
if(destroyed) return
|
||
|
inNext = true
|
||
|
|
||
|
if (!err || self.opts.failures) {
|
||
|
queueData(data, number)
|
||
|
}
|
||
|
|
||
|
if (err) {
|
||
|
stream.emit.apply(stream, [ errorEventName, err ]);
|
||
|
}
|
||
|
|
||
|
inNext = false;
|
||
|
}
|
||
|
|
||
|
// Wrap the mapper function by calling its callback with the order number of
|
||
|
// the item in the stream.
|
||
|
function wrappedMapper (input, number, callback) {
|
||
|
return mapper.call(null, input, function(err, data){
|
||
|
callback(err, data, number)
|
||
|
})
|
||
|
}
|
||
|
|
||
|
stream.write = function (data) {
|
||
|
if(ended) throw new Error('map stream is not writable')
|
||
|
inNext = false
|
||
|
inputs ++
|
||
|
|
||
|
try {
|
||
|
//catch sync errors and handle them like async errors
|
||
|
var written = wrappedMapper(data, inputs, next)
|
||
|
paused = (written === false)
|
||
|
return !paused
|
||
|
} catch (err) {
|
||
|
//if the callback has been called syncronously, and the error
|
||
|
//has occured in an listener, throw it again.
|
||
|
if(inNext)
|
||
|
throw err
|
||
|
next(err)
|
||
|
return !paused
|
||
|
}
|
||
|
}
|
||
|
|
||
|
function end (data) {
|
||
|
//if end was called with args, write it,
|
||
|
ended = true //write will emit 'end' if ended is true
|
||
|
stream.writable = false
|
||
|
if(data !== undefined) {
|
||
|
return queueData(data, inputs)
|
||
|
} else if (inputs == outputs) { //wait for processing
|
||
|
stream.readable = false, stream.emit('end'), stream.destroy()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
stream.end = function (data) {
|
||
|
if(ended) return
|
||
|
end()
|
||
|
}
|
||
|
|
||
|
stream.destroy = function () {
|
||
|
ended = destroyed = true
|
||
|
stream.writable = stream.readable = paused = false
|
||
|
process.nextTick(function () {
|
||
|
stream.emit('close')
|
||
|
})
|
||
|
}
|
||
|
stream.pause = function () {
|
||
|
paused = true
|
||
|
}
|
||
|
|
||
|
stream.resume = function () {
|
||
|
paused = false
|
||
|
}
|
||
|
|
||
|
return stream
|
||
|
}
|
||
|
|
||
|
|
||
|
|
||
|
|