summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorisaacs <i@izs.me>2013-01-10 13:49:19 -0800
committerisaacs <i@izs.me>2013-01-10 13:49:54 -0800
commitb43e544140ccf68580c02e71c56d19b82e1e1d32 (patch)
tree7786ad95b81e772ebf5809430b3be4e56b80bc84
parent530585b2d10eea72e71007cb5d8ab18149653c99 (diff)
downloadnode-new-b43e544140ccf68580c02e71c56d19b82e1e1d32.tar.gz
stream: Use push() for Transform._output()
This also slightly changes the semantics, in that a 'readable' event may be triggered by the first write() call, even if a user has not yet called read(). This happens because the Transform _write() handler is calling read(0) to start the flow of data. Technically, the new behavior is more 'correct', since it is more in line with the semantics of the 'readable' event in other streams.
-rw-r--r--lib/_stream_transform.js44
-rw-r--r--test/simple/test-stream2-transform.js26
2 files changed, 19 insertions, 51 deletions
diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js
index b0819de29a..020dc8d76a 100644
--- a/lib/_stream_transform.js
+++ b/lib/_stream_transform.js
@@ -75,7 +75,7 @@ function TransformState(stream) {
this.transforming = false;
this.pendingReadCb = null;
this.output = function(chunk) {
- stream._output(chunk);
+ stream.push(chunk);
};
}
@@ -148,9 +148,6 @@ Transform.prototype._read = function(n, readcb) {
var rs = this._readableState;
var ts = this._transformState;
- if (ts.pendingReadCb)
- throw new Error('_read while _read already in progress');
-
ts.pendingReadCb = readcb;
// if there's nothing pending, then we just wait.
@@ -173,31 +170,6 @@ Transform.prototype._read = function(n, readcb) {
});
};
-Transform.prototype._output = function(chunk) {
- if (!chunk || !chunk.length)
- return;
-
- // if we've got a pending readcb, then just call that,
- // and let Readable take care of it. If not, then we fill
- // the readable buffer ourselves, and emit whatever's needed.
- var ts = this._transformState;
- var readcb = ts.pendingReadCb;
- if (readcb) {
- ts.pendingReadCb = null;
- readcb(null, chunk);
- return;
- }
-
- // otherwise, it's up to us to fill the rs buffer.
- var rs = this._readableState;
- var len = rs.length;
- rs.buffer.push(chunk);
- rs.length += chunk.length;
- if (rs.needReadable) {
- rs.needReadable = false;
- this.emit('readable');
- }
-};
function done(stream, er) {
if (er)
@@ -215,17 +187,5 @@ function done(stream, er) {
if (ts.transforming)
throw new Error('calling transform done when still transforming');
- // if we were waiting on a read, let them know that it isn't coming.
- var readcb = ts.pendingReadCb;
- if (readcb)
- return readcb();
-
- rs.ended = true;
- // we may have gotten a 'null' read before, and since there is
- // no more data coming from the writable side, we need to emit
- // now so that the consumer knows to pick up the tail bits.
- if (rs.length && rs.needReadable)
- stream.emit('readable');
- else if (rs.length === 0)
- stream.emit('end');
+ return stream.push(null);
}
diff --git a/test/simple/test-stream2-transform.js b/test/simple/test-stream2-transform.js
index 2bc008517c..9087080751 100644
--- a/test/simple/test-stream2-transform.js
+++ b/test/simple/test-stream2-transform.js
@@ -215,35 +215,40 @@ test('passthrough event emission', function(t) {
var i = 0;
pt.write(new Buffer('foog'));
+
+ console.error('need emit 0');
pt.write(new Buffer('bark'));
+ console.error('should have emitted readable now 1 === %d', emits);
+ t.equal(emits, 1);
+
t.equal(pt.read(5).toString(), 'foogb');
t.equal(pt.read(5) + '', 'null');
- console.error('need emit 0');
+ console.error('need emit 1');
pt.write(new Buffer('bazy'));
console.error('should have emitted, but not again');
pt.write(new Buffer('kuel'));
- console.error('should have emitted readable now 1 === %d', emits);
- t.equal(emits, 1);
+ console.error('should have emitted readable now 2 === %d', emits);
+ t.equal(emits, 2);
t.equal(pt.read(5).toString(), 'arkba');
t.equal(pt.read(5).toString(), 'zykue');
t.equal(pt.read(5), null);
- console.error('need emit 1');
+ console.error('need emit 2');
pt.end();
- t.equal(emits, 2);
+ t.equal(emits, 3);
t.equal(pt.read(5).toString(), 'l');
t.equal(pt.read(5), null);
console.error('should not have emitted again');
- t.equal(emits, 2);
+ t.equal(emits, 3);
t.end();
});
@@ -256,25 +261,28 @@ test('passthrough event emission reordered', function(t) {
});
pt.write(new Buffer('foog'));
+ console.error('need emit 0');
pt.write(new Buffer('bark'));
+ console.error('should have emitted readable now 1 === %d', emits);
+ t.equal(emits, 1);
t.equal(pt.read(5).toString(), 'foogb');
t.equal(pt.read(5), null);
- console.error('need emit 0');
+ console.error('need emit 1');
pt.once('readable', function() {
t.equal(pt.read(5).toString(), 'arkba');
t.equal(pt.read(5), null);
- console.error('need emit 1');
+ console.error('need emit 2');
pt.once('readable', function() {
t.equal(pt.read(5).toString(), 'zykue');
t.equal(pt.read(5), null);
pt.once('readable', function() {
t.equal(pt.read(5).toString(), 'l');
t.equal(pt.read(5), null);
- t.equal(emits, 3);
+ t.equal(emits, 4);
t.end();
});
pt.end();