summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/_stream_readable.js71
-rw-r--r--test/parallel/test-stream2-readable-wrap-destroy.js27
-rw-r--r--test/parallel/test-stream2-readable-wrap.js10
3 files changed, 56 insertions, 52 deletions
diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js
index 8ddb9562a4..cab1f4d878 100644
--- a/lib/_stream_readable.js
+++ b/lib/_stream_readable.js
@@ -26,6 +26,7 @@ const {
NumberIsInteger,
NumberIsNaN,
ObjectDefineProperties,
+ ObjectKeys,
ObjectSetPrototypeOf,
Set,
SymbolAsyncIterator,
@@ -1007,83 +1008,49 @@ function flow(stream) {
// This is *not* part of the readable stream interface.
// It is an ugly unfortunate mess of history.
Readable.prototype.wrap = function(stream) {
- const state = this._readableState;
let paused = false;
- stream.on('end', () => {
- debug('wrapped end');
- if (state.decoder && !state.ended) {
- const chunk = state.decoder.end();
- if (chunk && chunk.length)
- this.push(chunk);
- }
-
- this.push(null);
- });
+ // TODO (ronag): Should this.destroy(err) emit
+ // 'error' on the wrapped stream? Would require
+ // a static factory method, e.g. Readable.wrap(stream).
stream.on('data', (chunk) => {
- debug('wrapped data');
- if (state.decoder)
- chunk = state.decoder.write(chunk);
-
- // Don't skip over falsy values in objectMode.
- if (state.objectMode && (chunk === null || chunk === undefined))
- return;
- else if (!state.objectMode && (!chunk || !chunk.length))
- return;
-
- const ret = this.push(chunk);
- if (!ret) {
+ if (!this.push(chunk) && stream.pause) {
paused = true;
stream.pause();
}
});
- // Proxy all the other methods. Important when wrapping filters and duplexes.
- for (const i in stream) {
- if (this[i] === undefined && typeof stream[i] === 'function') {
- this[i] = function methodWrap(method) {
- return function methodWrapReturnFunction() {
- return stream[method].apply(stream, arguments);
- };
- }(i);
- }
- }
+ stream.on('end', () => {
+ this.push(null);
+ });
stream.on('error', (err) => {
errorOrDestroy(this, err);
});
stream.on('close', () => {
- // TODO(ronag): Update readable state?
- this.emit('close');
+ this.destroy();
});
stream.on('destroy', () => {
- // TODO(ronag): this.destroy()?
- this.emit('destroy');
+ this.destroy();
});
- stream.on('pause', () => {
- // TODO(ronag): this.pause()?
- this.emit('pause');
- });
-
- stream.on('resume', () => {
- // TODO(ronag): this.resume()?
- this.emit('resume');
- });
-
- // When we try to consume some more bytes, simply unpause the
- // underlying stream.
- this._read = (n) => {
- debug('wrapped _read', n);
- if (paused) {
+ this._read = () => {
+ if (paused && stream.resume) {
paused = false;
stream.resume();
}
};
+ // Proxy all the other methods. Important when wrapping filters and duplexes.
+ for (const i of ObjectKeys(stream)) {
+ if (this[i] === undefined && typeof stream[i] === 'function') {
+ this[i] = stream[i].bind(stream);
+ }
+ }
+
return this;
};
diff --git a/test/parallel/test-stream2-readable-wrap-destroy.js b/test/parallel/test-stream2-readable-wrap-destroy.js
new file mode 100644
index 0000000000..b0f4714c74
--- /dev/null
+++ b/test/parallel/test-stream2-readable-wrap-destroy.js
@@ -0,0 +1,27 @@
+'use strict';
+const common = require('../common');
+
+const Readable = require('_stream_readable');
+const EE = require('events').EventEmitter;
+
+const oldStream = new EE();
+oldStream.pause = () => {};
+oldStream.resume = () => {};
+
+{
+ new Readable({
+ autoDestroy: false,
+ destroy: common.mustCall()
+ })
+ .wrap(oldStream);
+ oldStream.emit('destroy');
+}
+
+{
+ new Readable({
+ autoDestroy: false,
+ destroy: common.mustCall()
+ })
+ .wrap(oldStream);
+ oldStream.emit('close');
+}
diff --git a/test/parallel/test-stream2-readable-wrap.js b/test/parallel/test-stream2-readable-wrap.js
index 0c9cb5861d..69f055fd7e 100644
--- a/test/parallel/test-stream2-readable-wrap.js
+++ b/test/parallel/test-stream2-readable-wrap.js
@@ -44,6 +44,16 @@ function runTest(highWaterMark, objectMode, produce) {
flow();
};
+ // Make sure pause is only emitted once.
+ let pausing = false;
+ r.on('pause', () => {
+ assert.strictEqual(pausing, false);
+ pausing = true;
+ process.nextTick(() => {
+ pausing = false;
+ });
+ });
+
let flowing;
let chunks = 10;
let oldEnded = false;