diff options
author | Brian White <mscdex@mscdex.net> | 2019-01-10 15:52:27 -0500 |
---|---|---|
committer | Brian White <mscdex@mscdex.net> | 2019-08-23 17:05:52 -0400 |
commit | 8292b280ec9e6b8c2444cbe49350facc77f5fefa (patch) | |
tree | 2f40ac41f20b53b748a7039c4f00e419efb8784b | |
parent | 9d21b0395cc248a0e5537a11cc84f61919eccca6 (diff) | |
download | node-new-8292b280ec9e6b8c2444cbe49350facc77f5fefa.tar.gz |
net: allow reading data into a static buffer
Co-Authored-By: Anna Henningsen <anna@addaleax.net>
PR-URL: https://github.com/nodejs/node/pull/25436
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
-rw-r--r-- | benchmark/net/net-s2c.js | 57 | ||||
-rw-r--r-- | doc/api/net.md | 36 | ||||
-rw-r--r-- | lib/internal/stream_base_commons.js | 31 | ||||
-rw-r--r-- | lib/net.js | 144 | ||||
-rw-r--r-- | src/stream_base.cc | 59 | ||||
-rw-r--r-- | src/stream_base.h | 26 | ||||
-rw-r--r-- | test/parallel/test-net-onread-static-buffer.js | 186 |
7 files changed, 474 insertions, 65 deletions
diff --git a/benchmark/net/net-s2c.js b/benchmark/net/net-s2c.js index e3c5c7e5eb..d8c26db9bd 100644 --- a/benchmark/net/net-s2c.js +++ b/benchmark/net/net-s2c.js @@ -5,33 +5,68 @@ const common = require('../common.js'); const PORT = common.PORT; const bench = common.createBenchmark(main, { - len: [64, 102400, 1024 * 1024 * 16], + sendchunklen: [256, 32 * 1024, 128 * 1024, 16 * 1024 * 1024], type: ['utf', 'asc', 'buf'], + recvbuflen: [0, 64 * 1024, 1024 * 1024], + recvbufgenfn: ['true', 'false'], dur: [5] }); var chunk; var encoding; +var recvbuf; +var received = 0; + +function main({ dur, sendchunklen, type, recvbuflen, recvbufgenfn }) { + if (isFinite(recvbuflen) && recvbuflen > 0) + recvbuf = Buffer.alloc(recvbuflen); -function main({ dur, len, type }) { switch (type) { case 'buf': - chunk = Buffer.alloc(len, 'x'); + chunk = Buffer.alloc(sendchunklen, 'x'); break; case 'utf': encoding = 'utf8'; - chunk = 'ü'.repeat(len / 2); + chunk = 'ü'.repeat(sendchunklen / 2); break; case 'asc': encoding = 'ascii'; - chunk = 'x'.repeat(len); + chunk = 'x'.repeat(sendchunklen); break; default: throw new Error(`invalid type: ${type}`); } const reader = new Reader(); - const writer = new Writer(); + var writer; + var socketOpts; + if (recvbuf === undefined) { + writer = new Writer(); + socketOpts = { port: PORT }; + } else { + let buffer = recvbuf; + if (recvbufgenfn === 'true') { + let bufidx = -1; + const bufpool = [ + recvbuf, + Buffer.from(recvbuf), + Buffer.from(recvbuf), + ]; + buffer = () => { + bufidx = (bufidx + 1) % bufpool.length; + return bufpool[bufidx]; + }; + } + socketOpts = { + port: PORT, + onread: { + buffer, + callback: function(nread, buf) { + received += nread; + } + } + }; + } // The actual benchmark. const server = net.createServer((socket) => { @@ -39,14 +74,15 @@ function main({ dur, len, type }) { }); server.listen(PORT, () => { - const socket = net.connect(PORT); + const socket = net.connect(socketOpts); socket.on('connect', () => { bench.start(); - socket.pipe(writer); + if (recvbuf === undefined) + socket.pipe(writer); setTimeout(() => { - const bytes = writer.received; + const bytes = received; const gbits = (bytes * 8) / (1024 * 1024 * 1024); bench.end(gbits); process.exit(0); @@ -58,12 +94,11 @@ function main({ dur, len, type }) { const net = require('net'); function Writer() { - this.received = 0; this.writable = true; } Writer.prototype.write = function(chunk, encoding, cb) { - this.received += chunk.length; + received += chunk.length; if (typeof encoding === 'function') encoding(); diff --git a/doc/api/net.md b/doc/api/net.md index 396cad36ab..a396edf31a 100644 --- a/doc/api/net.md +++ b/doc/api/net.md @@ -593,6 +593,9 @@ for the [`'connect'`][] event **once**. <!-- YAML added: v0.1.90 changes: + - version: REPLACEME + pr-url: https://github.com/nodejs/node/pull/25436 + description: Added `onread` option. - version: v6.0.0 pr-url: https://github.com/nodejs/node/pull/6021 description: The `hints` option defaults to `0` in all cases now. @@ -629,6 +632,39 @@ For [IPC][] connections, available `options` are: See [Identifying paths for IPC connections][]. If provided, the TCP-specific options above are ignored. +For both types, available `options` include: + +* `onread` {Object} - If specified, incoming data is stored in a single `buffer` + and passed to the supplied `callback` when data arrives on the socket. + Note: this will cause the streaming functionality to not provide any data, + however events like `'error'`, `'end'`, and `'close'` will still be emitted + as normal and methods like `pause()` and `resume()` will also behave as + expected. + * `buffer` {Buffer|Uint8Array|Function} - Either a reusable chunk of memory to + use for storing incoming data or a function that returns such. + * `callback` {Function} This function is called for every chunk of incoming + data. Two arguments are passed to it: the number of bytes written to + `buffer` and a reference to `buffer`. Return `false` from this function to + implicitly `pause()` the socket. This function will be executed in the + global context. + +Following is an example of a client using the `onread` option: + +```js +const net = require('net'); +net.connect({ + port: 80, + onread: { + // Reuses a 4KiB Buffer for every read from the socket + buffer: Buffer.alloc(4 * 1024), + callback: function(nread, buf) { + // Received data is available in `buf` from 0 to `nread` + console.log(buf.toString('utf8', 0, nread)); + } + } +}); +``` + #### socket.connect(path[, connectListener]) * `path` {string} Path the client should connect to. See diff --git a/lib/internal/stream_base_commons.js b/lib/internal/stream_base_commons.js index 88896083f1..eb2e53963d 100644 --- a/lib/internal/stream_base_commons.js +++ b/lib/internal/stream_base_commons.js @@ -23,6 +23,7 @@ const { setUnrefTimeout, getTimerDuration } = require('internal/timers'); +const { isUint8Array } = require('internal/util/types'); const { clearTimeout } = require('timers'); const kMaybeDestroy = Symbol('kMaybeDestroy'); @@ -32,6 +33,9 @@ const kHandle = Symbol('kHandle'); const kSession = Symbol('kSession'); const debug = require('internal/util/debuglog').debuglog('stream'); +const kBuffer = Symbol('kBuffer'); +const kBufferGen = Symbol('kBufferGen'); +const kBufferCb = Symbol('kBufferCb'); function handleWriteReq(req, data, encoding) { const { handle } = req; @@ -161,9 +165,23 @@ function onStreamRead(arrayBuffer) { stream[kUpdateTimer](); if (nread > 0 && !stream.destroyed) { - const offset = streamBaseState[kArrayBufferOffset]; - const buf = new FastBuffer(arrayBuffer, offset, nread); - if (!stream.push(buf)) { + let ret; + let result; + const userBuf = stream[kBuffer]; + if (userBuf) { + result = (stream[kBufferCb](nread, userBuf) !== false); + const bufGen = stream[kBufferGen]; + if (bufGen !== null) { + const nextBuf = bufGen(); + if (isUint8Array(nextBuf)) + stream[kBuffer] = ret = nextBuf; + } + } else { + const offset = streamBaseState[kArrayBufferOffset]; + const buf = new FastBuffer(arrayBuffer, offset, nread); + result = stream.push(buf); + } + if (!result) { handle.reading = false; if (!stream.destroyed) { const err = handle.readStop(); @@ -172,7 +190,7 @@ function onStreamRead(arrayBuffer) { } } - return; + return ret; } if (nread === 0) { @@ -241,5 +259,8 @@ module.exports = { kUpdateTimer, kHandle, kSession, - setStreamTimeout + setStreamTimeout, + kBuffer, + kBufferCb, + kBufferGen }; diff --git a/lib/net.js b/lib/net.js index 2b099d75ef..1eb1d212cd 100644 --- a/lib/net.js +++ b/lib/net.js @@ -67,7 +67,10 @@ const { kAfterAsyncWrite, kHandle, kUpdateTimer, - setStreamTimeout + setStreamTimeout, + kBuffer, + kBufferCb, + kBufferGen } = require('internal/stream_base_commons'); const { codes: { @@ -86,6 +89,7 @@ const { exceptionWithHostPort, uvExceptionWithHostPort } = require('internal/errors'); +const { isUint8Array } = require('internal/util/types'); const { validateInt32, validateString } = require('internal/validators'); const kLastWriteQueueSize = Symbol('lastWriteQueueSize'); const { @@ -225,6 +229,18 @@ function initSocketHandle(self) { self._handle[owner_symbol] = self; self._handle.onread = onStreamRead; self[async_id_symbol] = getNewAsyncId(self._handle); + + let userBuf = self[kBuffer]; + if (userBuf) { + const bufGen = self[kBufferGen]; + if (bufGen !== null) { + userBuf = bufGen(); + if (!isUint8Array(userBuf)) + return; + self[kBuffer] = userBuf; + } + self._handle.useUserBuffer(userBuf); + } } } @@ -247,6 +263,9 @@ function Socket(options) { this._host = null; this[kLastWriteQueueSize] = 0; this[kTimeout] = null; + this[kBuffer] = null; + this[kBufferCb] = null; + this[kBufferGen] = null; if (typeof options === 'number') options = { fd: options }; // Legacy interface. @@ -271,40 +290,55 @@ function Socket(options) { if (options.handle) { this._handle = options.handle; // private this[async_id_symbol] = getNewAsyncId(this._handle); - } else if (options.fd !== undefined) { - const { fd } = options; - let err; - - // createHandle will throw ERR_INVALID_FD_TYPE if `fd` is not - // a valid `PIPE` or `TCP` descriptor - this._handle = createHandle(fd, false); - - err = this._handle.open(fd); + } else { + const onread = options.onread; + if (onread !== null && typeof onread === 'object' && + (isUint8Array(onread.buffer) || typeof onread.buffer === 'function') && + typeof onread.callback === 'function') { + if (typeof onread.buffer === 'function') { + this[kBuffer] = true; + this[kBufferGen] = onread.buffer; + } else { + this[kBuffer] = onread.buffer; + } + this[kBufferCb] = onread.callback; + } + if (options.fd !== undefined) { + const { fd } = options; + let err; - // While difficult to fabricate, in some architectures - // `open` may return an error code for valid file descriptors - // which cannot be opened. This is difficult to test as most - // un-openable fds will throw on `createHandle` - if (err) - throw errnoException(err, 'open'); + // createHandle will throw ERR_INVALID_FD_TYPE if `fd` is not + // a valid `PIPE` or `TCP` descriptor + this._handle = createHandle(fd, false); - this[async_id_symbol] = this._handle.getAsyncId(); + err = this._handle.open(fd); - if ((fd === 1 || fd === 2) && - (this._handle instanceof Pipe) && - process.platform === 'win32') { - // Make stdout and stderr blocking on Windows - err = this._handle.setBlocking(true); + // While difficult to fabricate, in some architectures + // `open` may return an error code for valid file descriptors + // which cannot be opened. This is difficult to test as most + // un-openable fds will throw on `createHandle` if (err) - throw errnoException(err, 'setBlocking'); - - this._writev = null; - this._write = makeSyncWrite(fd); - // makeSyncWrite adjusts this value like the original handle would, so - // we need to let it do that by turning it into a writable, own property. - Object.defineProperty(this._handle, 'bytesWritten', { - value: 0, writable: true - }); + throw errnoException(err, 'open'); + + this[async_id_symbol] = this._handle.getAsyncId(); + + if ((fd === 1 || fd === 2) && + (this._handle instanceof Pipe) && + process.platform === 'win32') { + // Make stdout and stderr blocking on Windows + err = this._handle.setBlocking(true); + if (err) + throw errnoException(err, 'setBlocking'); + + this._writev = null; + this._write = makeSyncWrite(fd); + // makeSyncWrite adjusts this value like the original handle would, so + // we need to let it do that by turning it into a writable, own + // property. + Object.defineProperty(this._handle, 'bytesWritten', { + value: 0, writable: true + }); + } } } @@ -514,6 +548,15 @@ Object.defineProperty(Socket.prototype, kUpdateTimer, { }); +function tryReadStart(socket) { + // Not already reading, start the flow + debug('Socket._handle.readStart'); + socket._handle.reading = true; + var err = socket._handle.readStart(); + if (err) + socket.destroy(errnoException(err, 'read')); +} + // Just call handle.readStart until we have enough in the buffer Socket.prototype._read = function(n) { debug('_read'); @@ -522,12 +565,7 @@ Socket.prototype._read = function(n) { debug('_read wait for connection'); this.once('connect', () => this._read(n)); } else if (!this._handle.reading) { - // Not already reading, start the flow - debug('Socket._read readStart'); - this._handle.reading = true; - var err = this._handle.readStart(); - if (err) - this.destroy(errnoException(err, 'read')); + tryReadStart(this); } }; @@ -539,6 +577,38 @@ Socket.prototype.end = function(data, encoding, callback) { }; +Socket.prototype.pause = function() { + if (this[kBuffer] && !this.connecting && this._handle && + this._handle.reading) { + this._handle.reading = false; + if (!this.destroyed) { + const err = this._handle.readStop(); + if (err) + this.destroy(errnoException(err, 'read')); + } + } + return stream.Duplex.prototype.pause.call(this); +}; + + +Socket.prototype.resume = function() { + if (this[kBuffer] && !this.connecting && this._handle && + !this._handle.reading) { + tryReadStart(this); + } + return stream.Duplex.prototype.resume.call(this); +}; + + +Socket.prototype.read = function(n) { + if (this[kBuffer] && !this.connecting && this._handle && + !this._handle.reading) { + tryReadStart(this); + } + return stream.Duplex.prototype.read.call(this, n); +}; + + // Called when the 'end' event is emitted. function onReadableStreamEnd() { if (!this.allowHalfOpen) { diff --git a/src/stream_base.cc b/src/stream_base.cc index 695d19c123..52163e2e43 100644 --- a/src/stream_base.cc +++ b/src/stream_base.cc @@ -26,6 +26,7 @@ using v8::FunctionCallbackInfo; using v8::HandleScope; using v8::Integer; using v8::Local; +using v8::MaybeLocal; using v8::Object; using v8::ReadOnly; using v8::String; @@ -50,6 +51,13 @@ int StreamBase::ReadStopJS(const FunctionCallbackInfo<Value>& args) { return ReadStop(); } +int StreamBase::UseUserBuffer(const FunctionCallbackInfo<Value>& args) { + CHECK(Buffer::HasInstance(args[0])); + + uv_buf_t buf = uv_buf_init(Buffer::Data(args[0]), Buffer::Length(args[0])); + PushStreamListener(new CustomBufferJSListener(buf)); + return 0; +} int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) { CHECK(args[0]->IsObject()); @@ -291,19 +299,22 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) { } -void StreamBase::CallJSOnreadMethod(ssize_t nread, - Local<ArrayBuffer> ab, - size_t offset) { +MaybeLocal<Value> StreamBase::CallJSOnreadMethod(ssize_t nread, + Local<ArrayBuffer> ab, + size_t offset, + StreamBaseJSChecks checks) { Environment* env = env_; DCHECK_EQ(static_cast<int32_t>(nread), nread); DCHECK_LE(offset, INT32_MAX); - if (ab.IsEmpty()) { - DCHECK_EQ(offset, 0); - DCHECK_LE(nread, 0); - } else { - DCHECK_GE(nread, 0); + if (checks == DONT_SKIP_NREAD_CHECKS) { + if (ab.IsEmpty()) { + DCHECK_EQ(offset, 0); + DCHECK_LE(nread, 0); + } else { + DCHECK_GE(nread, 0); + } } env->stream_base_state()[kReadBytesOrError] = nread; @@ -317,7 +328,7 @@ void StreamBase::CallJSOnreadMethod(ssize_t nread, CHECK_NOT_NULL(wrap); Local<Value> onread = wrap->object()->GetInternalField(kOnReadFunctionField); CHECK(onread->IsFunction()); - wrap->MakeCallback(onread.As<Function>(), arraysize(argv), argv); + return wrap->MakeCallback(onread.As<Function>(), arraysize(argv), argv); } @@ -366,6 +377,9 @@ void StreamBase::AddMethods(Environment* env, Local<FunctionTemplate> t) { env->SetProtoMethod(t, "readStart", JSMethod<&StreamBase::ReadStartJS>); env->SetProtoMethod(t, "readStop", JSMethod<&StreamBase::ReadStopJS>); env->SetProtoMethod(t, "shutdown", JSMethod<&StreamBase::Shutdown>); + env->SetProtoMethod(t, + "useUserBuffer", + JSMethod<&StreamBase::UseUserBuffer>); env->SetProtoMethod(t, "writev", JSMethod<&StreamBase::Writev>); env->SetProtoMethod(t, "writeBuffer", JSMethod<&StreamBase::WriteBuffer>); env->SetProtoMethod( @@ -445,6 +459,7 @@ void StreamResource::ClearError() { // No-op } + uv_buf_t EmitToJSStreamListener::OnStreamAlloc(size_t suggested_size) { CHECK_NOT_NULL(stream_); Environment* env = static_cast<StreamBase*>(stream_)->stream_env(); @@ -472,6 +487,32 @@ void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) { } +uv_buf_t CustomBufferJSListener::OnStreamAlloc(size_t suggested_size) { + return buffer_; +} + + +void CustomBufferJSListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) { + CHECK_NOT_NULL(stream_); + CHECK_EQ(buf.base, buffer_.base); + + StreamBase* stream = static_cast<StreamBase*>(stream_); + Environment* env = stream->stream_env(); + HandleScope handle_scope(env->isolate()); + Context::Scope context_scope(env->context()); + + MaybeLocal<Value> ret = stream->CallJSOnreadMethod(nread, + Local<ArrayBuffer>(), + 0, + StreamBase::SKIP_NREAD_CHECKS); + Local<Value> next_buf_v; + if (ret.ToLocal(&next_buf_v) && !next_buf_v->IsUndefined()) { + buffer_.base = Buffer::Data(next_buf_v); + buffer_.len = Buffer::Length(next_buf_v); + } +} + + void ReportWritesToJSStreamListener::OnStreamAfterReqFinished( StreamReq* req_wrap, int status) { StreamBase* stream = static_cast<StreamBase*>(stream_); diff --git a/src/stream_base.h b/src/stream_base.h index 3550233290..3bfdaedb79 100644 --- a/src/stream_base.h +++ b/src/stream_base.h @@ -180,6 +180,21 @@ class EmitToJSStreamListener : public ReportWritesToJSStreamListener { }; +// An alternative listener that uses a custom, user-provided buffer +// for reading data. +class CustomBufferJSListener : public ReportWritesToJSStreamListener { + public: + uv_buf_t OnStreamAlloc(size_t suggested_size) override; + void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override; + void OnStreamDestroy() override { delete this; } + + explicit CustomBufferJSListener(uv_buf_t buffer) : buffer_(buffer) {} + + private: + uv_buf_t buffer_; +}; + + // A generic stream, comparable to JS land’s `Duplex` streams. // A stream is always controlled through one `StreamListener` instance. class StreamResource { @@ -273,9 +288,13 @@ class StreamBase : public StreamResource { virtual bool IsIPCPipe(); virtual int GetFD(); - void CallJSOnreadMethod(ssize_t nread, - v8::Local<v8::ArrayBuffer> ab, - size_t offset = 0); + enum StreamBaseJSChecks { DONT_SKIP_NREAD_CHECKS, SKIP_NREAD_CHECKS }; + + v8::MaybeLocal<v8::Value> CallJSOnreadMethod( + ssize_t nread, + v8::Local<v8::ArrayBuffer> ab, + size_t offset = 0, + StreamBaseJSChecks checks = DONT_SKIP_NREAD_CHECKS); // This is named `stream_env` to avoid name clashes, because a lot of // subclasses are also `BaseObject`s. @@ -323,6 +342,7 @@ class StreamBase : public StreamResource { int WriteBuffer(const v8::FunctionCallbackInfo<v8::Value>& args); template <enum encoding enc> int WriteString(const v8::FunctionCallbackInfo<v8::Value>& args); + int UseUserBuffer(const v8::FunctionCallbackInfo<v8::Value>& args); static void GetFD(const v8::FunctionCallbackInfo<v8::Value>& args); static void GetExternal(const v8::FunctionCallbackInfo<v8::Value>& args); diff --git a/test/parallel/test-net-onread-static-buffer.js b/test/parallel/test-net-onread-static-buffer.js new file mode 100644 index 0000000000..ce722f69cd --- /dev/null +++ b/test/parallel/test-net-onread-static-buffer.js @@ -0,0 +1,186 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const net = require('net'); + +const message = Buffer.from('hello world'); + +// Test typical usage +net.createServer(common.mustCall(function(socket) { + this.close(); + socket.end(message); +})).listen(0, function() { + let received = 0; + const buffers = []; + const sockBuf = Buffer.alloc(8); + net.connect({ + port: this.address().port, + onread: { + buffer: sockBuf, + callback: function(nread, buf) { + assert.strictEqual(buf, sockBuf); + received += nread; + buffers.push(Buffer.from(buf.slice(0, nread))); + } + } + }).on('data', common.mustNotCall()).on('end', common.mustCall(() => { + assert.strictEqual(received, message.length); + assert.deepStrictEqual(Buffer.concat(buffers), message); + })); +}); + +// Test Uint8Array support +net.createServer(common.mustCall(function(socket) { + this.close(); + socket.end(message); +})).listen(0, function() { + let received = 0; + let incoming = new Uint8Array(0); + const sockBuf = new Uint8Array(8); + net.connect({ + port: this.address().port, + onread: { + buffer: sockBuf, + callback: function(nread, buf) { + assert.strictEqual(buf, sockBuf); + received += nread; + const newIncoming = new Uint8Array(incoming.length + nread); + newIncoming.set(incoming); + newIncoming.set(buf.slice(0, nread), incoming.length); + incoming = newIncoming; + } + } + }).on('data', common.mustNotCall()).on('end', common.mustCall(() => { + assert.strictEqual(received, message.length); + assert.deepStrictEqual(incoming, new Uint8Array(message)); + })); +}); + +// Test Buffer callback usage +net.createServer(common.mustCall(function(socket) { + this.close(); + socket.end(message); +})).listen(0, function() { + let received = 0; + const incoming = []; + const bufPool = [ Buffer.alloc(2), Buffer.alloc(2), Buffer.alloc(2) ]; + let bufPoolIdx = -1; + let bufPoolUsage = 0; + net.connect({ + port: this.address().port, + onread: { + buffer: () => { + ++bufPoolUsage; + bufPoolIdx = (bufPoolIdx + 1) % bufPool.length; + return bufPool[bufPoolIdx]; + }, + callback: function(nread, buf) { + assert.strictEqual(buf, bufPool[bufPoolIdx]); + received += nread; + incoming.push(Buffer.from(buf.slice(0, nread))); + } + } + }).on('data', common.mustNotCall()).on('end', common.mustCall(() => { + assert.strictEqual(received, message.length); + assert.deepStrictEqual(Buffer.concat(incoming), message); + assert.strictEqual(bufPoolUsage, 7); + })); +}); + +// Test Uint8Array callback support +net.createServer(common.mustCall(function(socket) { + this.close(); + socket.end(message); +})).listen(0, function() { + let received = 0; + let incoming = new Uint8Array(0); + const bufPool = [ new Uint8Array(2), new Uint8Array(2), new Uint8Array(2) ]; + let bufPoolIdx = -1; + let bufPoolUsage = 0; + net.connect({ + port: this.address().port, + onread: { + buffer: () => { + ++bufPoolUsage; + bufPoolIdx = (bufPoolIdx + 1) % bufPool.length; + return bufPool[bufPoolIdx]; + }, + callback: function(nread, buf) { + assert.strictEqual(buf, bufPool[bufPoolIdx]); + received += nread; + const newIncoming = new Uint8Array(incoming.length + nread); + newIncoming.set(incoming); + newIncoming.set(buf.slice(0, nread), incoming.length); + incoming = newIncoming; + } + } + }).on('data', common.mustNotCall()).on('end', common.mustCall(() => { + assert.strictEqual(received, message.length); + assert.deepStrictEqual(incoming, new Uint8Array(message)); + assert.strictEqual(bufPoolUsage, 7); + })); +}); + +// Test explicit socket pause +net.createServer(common.mustCall(function(socket) { + this.close(); + socket.end(message); +})).listen(0, function() { + let received = 0; + const buffers = []; + const sockBuf = Buffer.alloc(8); + let paused = false; + net.connect({ + port: this.address().port, + onread: { + buffer: sockBuf, + callback: function(nread, buf) { + assert.strictEqual(paused, false); + assert.strictEqual(buf, sockBuf); + received += nread; + buffers.push(Buffer.from(buf.slice(0, nread))); + paused = true; + this.pause(); + setTimeout(() => { + paused = false; + this.resume(); + }, 100); + } + } + }).on('data', common.mustNotCall()).on('end', common.mustCall(() => { + assert.strictEqual(received, message.length); + assert.deepStrictEqual(Buffer.concat(buffers), message); + })); +}); + +// Test implicit socket pause +net.createServer(common.mustCall(function(socket) { + this.close(); + socket.end(message); +})).listen(0, function() { + let received = 0; + const buffers = []; + const sockBuf = Buffer.alloc(8); + let paused = false; + net.connect({ + port: this.address().port, + onread: { + buffer: sockBuf, + callback: function(nread, buf) { + assert.strictEqual(paused, false); + assert.strictEqual(buf, sockBuf); + received += nread; + buffers.push(Buffer.from(buf.slice(0, nread))); + paused = true; + setTimeout(() => { + paused = false; + this.resume(); + }, 100); + return false; + } + } + }).on('data', common.mustNotCall()).on('end', common.mustCall(() => { + assert.strictEqual(received, message.length); + assert.deepStrictEqual(Buffer.concat(buffers), message); + })); +}); |