summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnna Henningsen <anna@addaleax.net>2017-01-09 19:05:06 +0100
committerAnna Henningsen <anna@addaleax.net>2017-05-27 11:19:39 +0200
commit9f610b5e265549f048ef00cf521a0d36771c9574 (patch)
treedd12b56b2d516ce5d4a61697ae539f2a370a4760
parent0ecdf2934039b6e847aa3d1441e3ec235a70d125 (diff)
downloadnode-new-9f610b5e265549f048ef00cf521a0d36771c9574.tar.gz
stream: support Uint8Array input to methods
PR-URL: https://github.com/nodejs/node/pull/11608 Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
-rw-r--r--doc/api/stream.md69
-rw-r--r--lib/_stream_readable.js8
-rw-r--r--lib/_stream_writable.js6
-rw-r--r--lib/internal/streams/BufferList.js6
-rw-r--r--lib/stream.js37
-rw-r--r--test/parallel/test-stream-uint8array.js102
6 files changed, 205 insertions, 23 deletions
diff --git a/doc/api/stream.md b/doc/api/stream.md
index d8c27ea1c4..934a3bf047 100644
--- a/doc/api/stream.md
+++ b/doc/api/stream.md
@@ -47,10 +47,10 @@ There are four fundamental stream types within Node.js:
### Object Mode
All streams created by Node.js APIs operate exclusively on strings and `Buffer`
-objects. It is possible, however, for stream implementations to work with other
-types of JavaScript values (with the exception of `null`, which serves a special
-purpose within streams). Such streams are considered to operate in "object
-mode".
+(or `Uint8Array`) objects. It is possible, however, for stream implementations
+to work with other types of JavaScript values (with the exception of `null`,
+which serves a special purpose within streams). Such streams are considered to
+operate in "object mode".
Stream instances are switched into object mode using the `objectMode` option
when the stream is created. Attempting to switch an existing stream into
@@ -352,12 +352,17 @@ See also: [`writable.uncork()`][].
##### writable.end([chunk][, encoding][, callback])
<!-- YAML
added: v0.9.4
+changes:
+ - version: REPLACEME
+ pr-url: https://github.com/nodejs/node/pull/11608
+ description: The `chunk` argument can now be a `Uint8Array` instance.
-->
-* `chunk` {string|Buffer|any} Optional data to write. For streams not operating
- in object mode, `chunk` must be a string or a `Buffer`. For object mode
- streams, `chunk` may be any JavaScript value other than `null`.
-* `encoding` {string} The encoding, if `chunk` is a String
+* `chunk` {string|Buffer|Uint8Array|any} Optional data to write. For streams
+ not operating in object mode, `chunk` must be a string, `Buffer` or
+ `Uint8Array`. For object mode streams, `chunk` may be any JavaScript value
+ other than `null`.
+* `encoding` {string} The encoding, if `chunk` is a string
* `callback` {Function} Optional callback for when the stream is finished
Calling the `writable.end()` method signals that no more data will be written
@@ -434,14 +439,20 @@ See also: [`writable.cork()`][].
<!-- YAML
added: v0.9.4
changes:
+ - version: REPLACEME
+ pr-url: https://github.com/nodejs/node/pull/11608
+ description: The `chunk` argument can now be a `Uint8Array` instance.
- version: v6.0.0
pr-url: https://github.com/nodejs/node/pull/6170
description: Passing `null` as the `chunk` parameter will always be
considered invalid now, even in object mode.
-->
-* `chunk` {string|Buffer} The data to write
-* `encoding` {string} The encoding, if `chunk` is a String
+* `chunk` {string|Buffer|Uint8Array|any} Optional data to write. For streams
+ not operating in object mode, `chunk` must be a string, `Buffer` or
+ `Uint8Array`. For object mode streams, `chunk` may be any JavaScript value
+ other than `null`.
+* `encoding` {string} The encoding, if `chunk` is a string
* `callback` {Function} Callback for when this chunk of data is flushed
* Returns: {boolean} `false` if the stream wishes for the calling code to
wait for the `'drain'` event to be emitted before continuing to write
@@ -985,9 +996,16 @@ setTimeout(() => {
##### readable.unshift(chunk)
<!-- YAML
added: v0.9.11
+changes:
+ - version: REPLACEME
+ pr-url: https://github.com/nodejs/node/pull/11608
+ description: The `chunk` argument can now be a `Uint8Array` instance.
-->
-* `chunk` {Buffer|string|any} Chunk of data to unshift onto the read queue
+* `chunk` {Buffer|Uint8Array|string|any} Chunk of data to unshift onto the
+ read queue. For streams not operating in object mode, `chunk` must be a
+ string, `Buffer` or `Uint8Array`. For object mode streams, `chunk` may be
+ any JavaScript value other than `null`.
The `readable.unshift()` method pushes a chunk of data back into the internal
buffer. This is useful in certain situations where a stream is being consumed by
@@ -1274,8 +1292,9 @@ constructor and implement the `writable._write()` method. The
Defaults to `true`
* `objectMode` {boolean} Whether or not the
[`stream.write(anyObj)`][stream-write] is a valid operation. When set,
- it becomes possible to write JavaScript values other than string or
- `Buffer` if supported by the stream implementation. Defaults to `false`
+ it becomes possible to write JavaScript values other than string,
+ `Buffer` or `Uint8Array` if supported by the stream implementation.
+ Defaults to `false`
* `write` {Function} Implementation for the
[`stream._write()`][stream-_write] method.
* `writev` {Function} Implementation for the
@@ -1564,16 +1583,26 @@ internal to the class that defines it, and should never be called directly by
user programs.
#### readable.push(chunk[, encoding])
+<!-- YAML
+changes:
+ - version: REPLACEME
+ pr-url: https://github.com/nodejs/node/pull/11608
+ description: The `chunk` argument can now be a `Uint8Array` instance.
+-->
-* `chunk` {Buffer|null|string|any} Chunk of data to push into the read queue
-* `encoding` {string} Encoding of String chunks. Must be a valid
+* `chunk` {Buffer|Uint8Array|string|null|any} Chunk of data to push into the
+ read queue. For streams not operating in object mode, `chunk` must be a
+ string, `Buffer` or `Uint8Array`. For object mode streams, `chunk` may be
+ any JavaScript value.
+* `encoding` {string} Encoding of string chunks. Must be a valid
Buffer encoding, such as `'utf8'` or `'ascii'`
* Returns {boolean} `true` if additional chunks of data may continued to be
pushed; `false` otherwise.
-When `chunk` is not `null`, the `chunk` of data will be added to the
-internal queue for users of the stream to consume. Passing `chunk` as `null`
-signals the end of the stream (EOF), after which no more data can be written.
+When `chunk` is a `Buffer`, `Uint8Array` or `string`, the `chunk` of data will
+be added to the internal queue for users of the stream to consume.
+Passing `chunk` as `null` signals the end of the stream (EOF), after which no
+more data can be written.
When the Readable is operating in paused mode, the data added with
`readable.push()` can be read out by calling the
@@ -2088,8 +2117,8 @@ Readable stream class internals.
Use of `readable.push('')` is not recommended.
-Pushing a zero-byte string or `Buffer` to a stream that is not in object mode
-has an interesting side effect. Because it *is* a call to
+Pushing a zero-byte string, `Buffer` or `Uint8Array` to a stream that is not in
+object mode has an interesting side effect. Because it *is* a call to
[`readable.push()`][stream-push], the call will end the reading process.
However, because the argument is an empty string, no data is added to the
readable buffer so there is nothing for a user to consume.
diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js
index 9666c7d6fb..4ba224cd5a 100644
--- a/lib/_stream_readable.js
+++ b/lib/_stream_readable.js
@@ -212,6 +212,12 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
if (er) {
stream.emit('error', er);
} else if (state.objectMode || chunk && chunk.length > 0) {
+ if (typeof chunk !== 'string' &&
+ Object.getPrototypeOf(chunk) !== Buffer.prototype &&
+ !state.objectMode) {
+ chunk = Stream._uint8ArrayToBuffer(chunk);
+ }
+
if (addToFront) {
if (state.endEmitted)
stream.emit('error', new Error('stream.unshift() after end event'));
@@ -259,7 +265,7 @@ function addChunk(stream, state, chunk, addToFront) {
function chunkInvalid(state, chunk) {
var er;
- if (!(chunk instanceof Buffer) &&
+ if (!Stream._isUint8Array(chunk) &&
typeof chunk !== 'string' &&
chunk !== undefined &&
!state.objectMode) {
diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js
index 768d80a96e..615f5a3762 100644
--- a/lib/_stream_writable.js
+++ b/lib/_stream_writable.js
@@ -248,7 +248,11 @@ function validChunk(stream, state, chunk, cb) {
Writable.prototype.write = function(chunk, encoding, cb) {
var state = this._writableState;
var ret = false;
- var isBuf = (chunk instanceof Buffer);
+ var isBuf = Stream._isUint8Array(chunk) && !state.objectMode;
+
+ if (isBuf && Object.getPrototypeOf(chunk) !== Buffer.prototype) {
+ chunk = Stream._uint8ArrayToBuffer(chunk);
+ }
if (typeof encoding === 'function') {
cb = encoding;
diff --git a/lib/internal/streams/BufferList.js b/lib/internal/streams/BufferList.js
index b9bdfa7d0e..6e724e9fb8 100644
--- a/lib/internal/streams/BufferList.js
+++ b/lib/internal/streams/BufferList.js
@@ -2,6 +2,10 @@
const Buffer = require('buffer').Buffer;
+function copyBuffer(src, target, offset) {
+ Buffer.prototype.copy.call(src, target, offset);
+}
+
module.exports = class BufferList {
constructor() {
this.head = null;
@@ -63,7 +67,7 @@ module.exports = class BufferList {
var p = this.head;
var i = 0;
while (p) {
- p.data.copy(ret, i);
+ copyBuffer(p.data, ret, i);
i += p.data.length;
p = p.next;
}
diff --git a/lib/stream.js b/lib/stream.js
index dca4f50fc0..9c1ba986d0 100644
--- a/lib/stream.js
+++ b/lib/stream.js
@@ -21,6 +21,8 @@
'use strict';
+const Buffer = require('buffer').Buffer;
+
// Note: export Stream before Readable/Writable/Duplex/...
// to avoid a cross-reference(require) issues
const Stream = module.exports = require('internal/streams/legacy');
@@ -33,3 +35,38 @@ Stream.PassThrough = require('_stream_passthrough');
// Backwards-compat with node 0.4.x
Stream.Stream = Stream;
+
+// Internal utilities
+try {
+ Stream._isUint8Array = process.binding('util').isUint8Array;
+} catch (e) {
+ // This throws for Node < 4.2.0 because there’s no util binding and
+ // returns undefined for Node < 7.4.0.
+}
+
+if (!Stream._isUint8Array) {
+ Stream._isUint8Array = function _isUint8Array(obj) {
+ return Object.prototype.toString.call(obj) === '[object Uint8Array]';
+ };
+}
+
+const version = process.version.substr(1).split('.');
+if (version[0] === 0 && version[1] < 12) {
+ Stream._uint8ArrayToBuffer = Buffer;
+} else {
+ try {
+ const internalBuffer = require('internal/buffer');
+ Stream._uint8ArrayToBuffer = function _uint8ArrayToBuffer(chunk) {
+ return new internalBuffer.FastBuffer(chunk.buffer,
+ chunk.byteOffset,
+ chunk.byteLength);
+ };
+ } catch (e) {
+ }
+
+ if (!Stream._uint8ArrayToBuffer) {
+ Stream._uint8ArrayToBuffer = function _uint8ArrayToBuffer(chunk) {
+ return Buffer.prototype.slice.call(chunk);
+ };
+ }
+}
diff --git a/test/parallel/test-stream-uint8array.js b/test/parallel/test-stream-uint8array.js
new file mode 100644
index 0000000000..1b02c55d97
--- /dev/null
+++ b/test/parallel/test-stream-uint8array.js
@@ -0,0 +1,102 @@
+'use strict';
+const common = require('../common');
+const assert = require('assert');
+const Buffer = require('buffer').Buffer;
+
+const { Readable, Writable } = require('stream');
+
+const ABC = new Uint8Array([0x41, 0x42, 0x43]);
+const DEF = new Uint8Array([0x44, 0x45, 0x46]);
+const GHI = new Uint8Array([0x47, 0x48, 0x49]);
+
+{
+ // Simple Writable test.
+
+ let n = 0;
+ const writable = new Writable({
+ write: common.mustCall((chunk, encoding, cb) => {
+ assert(chunk instanceof Buffer);
+ if (n++ === 0) {
+ assert.strictEqual(String(chunk), 'ABC');
+ } else {
+ assert.strictEqual(String(chunk), 'DEF');
+ }
+
+ cb();
+ }, 2)
+ });
+
+ writable.write(ABC);
+ writable.end(DEF);
+}
+
+{
+ // Writable test, pass in Uint8Array in object mode.
+
+ const writable = new Writable({
+ objectMode: true,
+ write: common.mustCall((chunk, encoding, cb) => {
+ assert(!(chunk instanceof Buffer));
+ assert(chunk instanceof Uint8Array);
+ assert.strictEqual(chunk, ABC);
+ assert.strictEqual(encoding, 'utf8');
+ cb();
+ })
+ });
+
+ writable.end(ABC);
+}
+
+{
+ // Writable test, multiple writes carried out via writev.
+ let callback;
+
+ const writable = new Writable({
+ write: common.mustCall((chunk, encoding, cb) => {
+ assert(chunk instanceof Buffer);
+ assert.strictEqual(encoding, 'buffer');
+ assert.strictEqual(String(chunk), 'ABC');
+ callback = cb;
+ }),
+ writev: common.mustCall((chunks, cb) => {
+ assert.strictEqual(chunks.length, 2);
+ assert.strictEqual(chunks[0].encoding, 'buffer');
+ assert.strictEqual(chunks[1].encoding, 'buffer');
+ assert.strictEqual(chunks[0].chunk + chunks[1].chunk, 'DEFGHI');
+ })
+ });
+
+ writable.write(ABC);
+ writable.write(DEF);
+ writable.end(GHI);
+ callback();
+}
+
+{
+ // Simple Readable test.
+ const readable = new Readable({
+ read() {}
+ });
+
+ readable.push(DEF);
+ readable.unshift(ABC);
+
+ const buf = readable.read();
+ assert(buf instanceof Buffer);
+ assert.deepStrictEqual([...buf], [...ABC, ...DEF]);
+}
+
+{
+ // Readable test, setEncoding.
+ const readable = new Readable({
+ read() {}
+ });
+
+ readable.setEncoding('utf8');
+
+ readable.push(DEF);
+ readable.unshift(ABC);
+
+ const out = readable.read();
+ assert.strictEqual(out, 'ABCDEF');
+}