diff options
author | isaacs <i@izs.me> | 2012-10-04 13:26:16 -0700 |
---|---|---|
committer | isaacs <i@izs.me> | 2012-12-13 17:00:24 -0800 |
commit | 3b59fd70f4ef26742cc66a28105d2be75590e4d2 (patch) | |
tree | 9c583fe82168bcdf8c7ef9bb1bb08ef4dc934535 /lib/_stream_transform.js | |
parent | 9b5abe5bfe31988da1180e5a47f38b8fed03f99e (diff) | |
download | node-new-3b59fd70f4ef26742cc66a28105d2be75590e4d2.tar.gz |
streams2: Make Transform streams pull-style
That is, the transform is triggered by a _read, not by a _write.
This way, backpressure works properly.
Diffstat (limited to 'lib/_stream_transform.js')
-rw-r--r-- | lib/_stream_transform.js | 120 |
1 files changed, 103 insertions, 17 deletions
diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index 79d40cffab..a3603f42a6 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -19,6 +19,7 @@ // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE // USE OR OTHER DEALINGS IN THE SOFTWARE. + // a transform stream is a readable/writable stream where you do // something with the data. Sometimes it's called a "filter", // but that's not a great name for it, since that implies a thing where @@ -29,6 +30,39 @@ // necessarily symmetric or synchronous transformation. For example, // a zlib stream might take multiple plain-text writes(), and then // emit a single compressed chunk some time in the future. +// +// Here's how this works: +// +// The Transform stream has all the aspects of the readable and writable +// stream classes. When you write(chunk), that calls _write(chunk,cb) +// internally, and returns false if there's a lot of pending writes +// buffered up. When you call read(), that calls _read(n,cb) until +// there's enough pending readable data buffered up. +// +// In a transform stream, the written data is placed in a buffer. When +// _read(n,cb) is called, it transforms the queued up data, calling the +// buffered _write cb's as it consumes chunks. If consuming a single +// written chunk would result in multiple output chunks, then the first +// outputted bit calls the readcb, and subsequent chunks just go into +// the read buffer, and will cause it to emit 'readable' if necessary. +// +// This way, back-pressure is actually determined by the reading side, +// since _read has to be called to start processing a new chunk. However, +// a pathological inflate type of transform can cause excessive buffering +// here. For example, imagine a stream where every byte of input is +// interpreted as an integer from 0-255, and then results in that many +// bytes of output. Writing the 4 bytes {ff,ff,ff,ff} would result in +// 1kb of data being output. In this case, you could write a very small +// amount of input, and end up with a very large amount of output. In +// such a pathological inflating mechanism, there'd be no way to tell +// the system to stop doing the transform. A single 4MB write could +// cause the system to run out of memory. +// +// However, even in such a pathological case, only a single written chunk +// would be consumed, and then the rest would wait (un-transformed) until +// the results of the previous transformed chunk were consumed. Because +// the transform happens on-demand, it will only transform as much as is +// necessary to fill the readable buffer to the specified lowWaterMark. module.exports = Transform; @@ -36,12 +70,21 @@ var Duplex = require('_stream_duplex'); var util = require('util'); util.inherits(Transform, Duplex); +function TransformState() { + this.buffer = []; + this.transforming = false; + this.pendingReadCb = null; +} + function Transform(options) { Duplex.call(this, options); // bind output so that it can be passed around as a regular function. this._output = this._output.bind(this); + // the queue of _write chunks that are pending being transformed + this._transformState = new TransformState(); + // when the writable side finishes, then flush out anything remaining. this.once('finish', function() { if ('function' === typeof this._flush) @@ -65,33 +108,65 @@ Transform.prototype._transform = function(chunk, output, cb) { throw new Error('not implemented'); }; - Transform.prototype._write = function(chunk, cb) { - this._transform(chunk, this._output, cb); + var ts = this._transformState; + ts.buffer.push([chunk, cb]); + + // now we have something to transform, if we were waiting for it. + if (ts.pendingReadCb && !ts.transforming) { + var readcb = ts.pendingReadCb; + ts.pendingReadCb = null; + this._read(-1, readcb); + } }; -Transform.prototype._read = function(n, cb) { +Transform.prototype._read = function(n, readcb) { var ws = this._writableState; var rs = this._readableState; + var ts = this._transformState; - // basically a no-op, since the _transform will fill the - // _readableState.buffer and emit 'readable' for us, and set ended - // Usually, we want to just not call the cb, and set the reading - // flag to false, so that another _read will happen next time, - // but no state changes. - rs.reading = false; - - // however, if the writable side has ended, and its buffer is clear, - // then that means that the input has all been consumed, and no more - // will ever be provide. treat this as an EOF, and pass back 0 bytes. - if ((ws.ended || ws.ending) && ws.length === 0) - cb(); + if (ts.pendingReadCb) + throw new Error('_read while _read already in progress'); + + ts.pendingReadCb = readcb; + + // if there's nothing pending, then we just wait. + // if we're already transforming, then also just hold on a sec. + // we've already stashed the readcb, so we can come back later + // when we have something to transform + if (ts.buffer.length === 0 || ts.transforming) + return; + + // go ahead and transform that thing, now that someone wants it + var req = ts.buffer.shift(); + var chunk = req[0]; + var writecb = req[1]; + var output = this._output; + ts.transforming = true; + this._transform(chunk, output, function(er, data) { + ts.transforming = false; + if (data) + output(data); + writecb(er); + }.bind(this)); }; Transform.prototype._output = function(chunk) { if (!chunk || !chunk.length) return; + // if we've got a pending readcb, then just call that, + // and let Readable take care of it. If not, then we fill + // the readable buffer ourselves, and emit whatever's needed. + var ts = this._transformState; + var readcb = ts.pendingReadCb; + if (readcb) { + ts.pendingReadCb = null; + readcb(null, chunk); + return; + } + + // otherwise, it's up to us to fill the rs buffer. var state = this._readableState; var len = state.length; state.buffer.push(chunk); @@ -110,6 +185,18 @@ function done(er) { // that nothing more will ever be provided var ws = this._writableState; var rs = this._readableState; + var ts = this._transformState; + + if (ws.length) + throw new Error('calling transform done when ws.length != 0'); + + if (ts.transforming) + throw new Error('calling transform done when still transforming'); + + // if we were waiting on a read, let them know that it isn't coming. + var readcb = ts.pendingReadCb; + if (readcb) + return readcb(); rs.ended = true; // we may have gotten a 'null' read before, and since there is @@ -117,7 +204,6 @@ function done(er) { // now so that the consumer knows to pick up the tail bits. if (rs.length && rs.needReadable) this.emit('readable'); - else if (rs.length === 0) { + else if (rs.length === 0) this.emit('end'); - } } |