summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Nagy <ronagy@icloud.com>2021-01-06 17:59:39 +0100
committerRobert Nagy <ronagy@icloud.com>2021-03-10 15:04:00 +0100
commite2f5bb7574e7e8189a3e38fc4d00c1213fd7160f (patch)
tree9d04d4a2a90ce85559c493e8d37a39552edbba07
parent38f32386c138073c6a020ce79085daea15e7b800 (diff)
downloadnode-new-e2f5bb7574e7e8189a3e38fc4d00c1213fd7160f.tar.gz
http: align with stream.Writable
Futher aligns OutgoingMessage with stream.Writable. In particular re-uses the construct/destroy logic from streams. Due to a lot of subtle assumptions this PR unfortunately touches a lot of different parts. PR-URL: https://github.com/nodejs/node/pull/36816 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
-rw-r--r--lib/_http_client.js52
-rw-r--r--lib/_http_outgoing.js331
-rw-r--r--lib/_http_server.js20
-rw-r--r--lib/internal/http.js1
-rw-r--r--test/parallel/test-http-agent-keepalive.js17
-rw-r--r--test/parallel/test-http-outgoing-end-multiple.js20
-rw-r--r--test/parallel/test-http-outgoing-message-capture-rejection.js2
-rw-r--r--test/parallel/test-stream-pipeline.js34
8 files changed, 245 insertions, 232 deletions
diff --git a/lib/_http_client.js b/lib/_http_client.js
index 842a33a41f..2ef04ef8de 100644
--- a/lib/_http_client.js
+++ b/lib/_http_client.js
@@ -58,7 +58,7 @@ const Agent = require('_http_agent');
const { Buffer } = require('buffer');
const { defaultTriggerAsyncIdScope } = require('internal/async_hooks');
const { URL, urlToHttpOptions, searchParamsSymbol } = require('internal/url');
-const { kOutHeaders, kNeedDrain } = require('internal/http');
+const { kOutHeaders } = require('internal/http');
const { connResetException, codes } = require('internal/errors');
const {
ERR_HTTP_HEADERS_SENT,
@@ -98,7 +98,7 @@ class HTTPClientAsyncResource {
}
function ClientRequest(input, options, cb) {
- FunctionPrototypeCall(OutgoingMessage, this);
+ FunctionPrototypeCall(OutgoingMessage, this, { autoDestroy: false });
if (typeof input === 'string') {
const urlStr = input;
@@ -298,7 +298,7 @@ function ClientRequest(input, options, cb) {
if (typeof options.createConnection === 'function') {
const oncreate = once((err, socket) => {
if (err) {
- process.nextTick(() => this.emit('error', err));
+ process.nextTick(() => emitError(this, err));
} else {
this.onSocket(socket);
}
@@ -366,8 +366,8 @@ function emitAbortNT(req) {
function ondrain() {
const msg = this._httpMessage;
- if (msg && !msg.finished && msg[kNeedDrain]) {
- msg[kNeedDrain] = false;
+ if (msg && !msg.finished && msg._writableState.needDrain) {
+ msg._writableState.needDrain = false;
msg.emit('drain');
}
}
@@ -393,8 +393,7 @@ function socketCloseListener() {
if (!res.complete) {
res.destroy(connResetException('aborted'));
}
- req._closed = true;
- req.emit('close');
+ emitClose(req);
if (!res.aborted && res.readable) {
res.push(null);
}
@@ -404,10 +403,9 @@ function socketCloseListener() {
// receive a response. The error needs to
// fire on the request.
req.socket._hadError = true;
- req.emit('error', connResetException('socket hang up'));
+ emitError(req, connResetException('socket hang up'));
}
- req._closed = true;
- req.emit('close');
+ emitClose(req);
}
// Too bad. That output wasn't getting written.
@@ -431,7 +429,7 @@ function socketErrorListener(err) {
// For Safety. Some additional errors might fire later on
// and we need to make sure we don't double-fire the error event.
req.socket._hadError = true;
- req.emit('error', err);
+ emitError(req, err);
}
const parser = socket.parser;
@@ -455,7 +453,7 @@ function socketOnEnd() {
// If we don't have a response then we know that the socket
// ended prematurely and we need to emit an error on the request.
req.socket._hadError = true;
- req.emit('error', connResetException('socket hang up'));
+ emitError(req, connResetException('socket hang up'));
}
if (parser) {
parser.finish();
@@ -478,7 +476,7 @@ function socketOnData(d) {
freeParser(parser, req, socket);
socket.destroy();
req.socket._hadError = true;
- req.emit('error', ret);
+ emitError(req, ret);
} else if (parser.incoming && parser.incoming.upgrade) {
// Upgrade (if status code 101) or CONNECT
const bytesParsed = ret;
@@ -510,9 +508,7 @@ function socketOnData(d) {
socket.readableFlowing = null;
req.emit(eventName, res, socket, bodyHead);
- req.destroyed = true;
- req._closed = true;
- req.emit('close');
+ emitClose(req);
} else {
// Requested Upgrade or used CONNECT method, but have no handler.
socket.destroy();
@@ -697,8 +693,7 @@ function requestOnPrefinish() {
}
function emitFreeNT(req) {
- req._closed = true;
- req.emit('close');
+ emitClose(req);
if (req.socket) {
req.socket.emit('free');
}
@@ -779,10 +774,10 @@ function onSocketNT(req, socket, err) {
err = connResetException('socket hang up');
}
if (err) {
- req.emit('error', err);
+ emitError(req, err);
}
req._closed = true;
- req.emit('close');
+ emitClose(req);
}
if (socket) {
@@ -862,6 +857,23 @@ function setSocketTimeout(sock, msecs) {
}
}
+function emitError(req, err) {
+ req.destroyed = true;
+ req._writableState.errored = err;
+ // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
+ err.stack; // eslint-disable-line no-unused-expressions
+ req._writableState.errorEmitted = true;
+ req.emit('error', err);
+}
+
+function emitClose(req) {
+ req.destroyed = true;
+ req._closed = true;
+ req._writableState.closed = true;
+ req._writableState.closeEmitted = true;
+ req.emit('close');
+}
+
ClientRequest.prototype.setNoDelay = function setNoDelay(noDelay) {
this._deferToConnect('setNoDelay', [noDelay]);
};
diff --git a/lib/_http_outgoing.js b/lib/_http_outgoing.js
index 37ef44f2f2..59ef07e33e 100644
--- a/lib/_http_outgoing.js
+++ b/lib/_http_outgoing.js
@@ -27,6 +27,7 @@ const {
ArrayPrototypeJoin,
ArrayPrototypePush,
ArrayPrototypeUnshift,
+ Error,
FunctionPrototype,
FunctionPrototypeBind,
FunctionPrototypeCall,
@@ -45,9 +46,9 @@ const {
const { getDefaultHighWaterMark } = require('internal/streams/state');
const assert = require('internal/assert');
const EE = require('events');
-const Stream = require('stream');
+const { Stream, Writable } = require('stream');
const internalUtil = require('internal/util');
-const { kOutHeaders, utcDate, kNeedDrain } = require('internal/http');
+const { kOutHeaders, utcDate } = require('internal/http');
const { Buffer } = require('buffer');
const common = require('_http_common');
const checkIsHttpToken = common._checkIsHttpToken;
@@ -76,11 +77,12 @@ const {
} = require('internal/errors');
const { validateString } = require('internal/validators');
const { isUint8Array } = require('internal/util/types');
+const { construct, destroy } = require('internal/streams/destroy');
const HIGH_WATER_MARK = getDefaultHighWaterMark();
const { CRLF, debug } = common;
-const kCorked = Symbol('corked');
+const kCorked = Symbol('kCorked');
const nop = FunctionPrototype;
@@ -94,7 +96,7 @@ function isCookieField(s) {
return s.length === 6 && StringPrototypeToLowerCase(s) === 'cookie';
}
-function OutgoingMessage() {
+function OutgoingMessage(opts) {
FunctionPrototypeCall(Stream, this);
// Queue that holds all currently pending data, until the response will be
@@ -107,9 +109,6 @@ function OutgoingMessage() {
// TCP socket and HTTP Parser and thus handle the backpressure.
this.outputSize = 0;
- this.writable = true;
- this.destroyed = false;
-
this._last = false;
this.chunkedEncoding = false;
this.shouldKeepAlive = true;
@@ -123,12 +122,9 @@ function OutgoingMessage() {
this._contentLength = null;
this._hasBody = true;
this._trailer = '';
- this[kNeedDrain] = false;
- this.finished = false;
this._headerSent = false;
this[kCorked] = 0;
- this._closed = false;
this.socket = null;
this._header = null;
@@ -137,42 +133,70 @@ function OutgoingMessage() {
this._keepAliveTimeout = 0;
this._onPendingData = nop;
+
+ this._writableState = {
+ objectMode: false,
+ writable: true,
+ constructed: false,
+ corked: 0,
+ prefinished: false,
+ destroyed: false,
+ closed: false,
+ closeEmitted: false,
+ errored: null,
+ errorEmitted: false,
+ needDrain: false,
+ autoDestroy: opts?.autoDestroy == null ? true : false,
+ emitClose: true,
+ ended: false,
+ ending: false,
+ finished: false
+ };
+
+ construct(this, () => {
+ this._flush();
+ });
}
-ObjectSetPrototypeOf(OutgoingMessage.prototype, Stream.prototype);
-ObjectSetPrototypeOf(OutgoingMessage, Stream);
+ObjectSetPrototypeOf(OutgoingMessage.prototype, Writable.prototype);
+ObjectSetPrototypeOf(OutgoingMessage, Writable);
-ObjectDefineProperty(OutgoingMessage.prototype, 'writableFinished', {
+ObjectDefineProperty(OutgoingMessage.prototype, 'finished', {
get() {
- return (
- this.finished &&
- this.outputSize === 0 &&
- (!this.socket || this.socket.writableLength === 0)
- );
+ return this._writableState.ended;
+ },
+ set(value) {
+ this._writableState.ended = value;
}
});
-ObjectDefineProperty(OutgoingMessage.prototype, 'writableObjectMode', {
+ObjectDefineProperty(OutgoingMessage.prototype, '_closed', {
get() {
- return false;
+ return this._writableState.closed;
+ },
+ set(value) {
+ this._writableState.closed = value;
}
});
-ObjectDefineProperty(OutgoingMessage.prototype, 'writableLength', {
+ObjectDefineProperty(OutgoingMessage.prototype, 'writable', {
get() {
- return this.outputSize + (this.socket ? this.socket.writableLength : 0);
+ // Compat.
+ return this._writableState.writable;
+ },
+ set(value) {
+ this._writableState.writable = value;
}
});
-ObjectDefineProperty(OutgoingMessage.prototype, 'writableHighWaterMark', {
+ObjectDefineProperty(OutgoingMessage.prototype, 'writableLength', {
get() {
- return this.socket ? this.socket.writableHighWaterMark : HIGH_WATER_MARK;
+ return this.outputSize + (this.socket ? this.socket.writableLength : 0);
}
});
-ObjectDefineProperty(OutgoingMessage.prototype, 'writableCorked', {
+ObjectDefineProperty(OutgoingMessage.prototype, 'writableHighWaterMark', {
get() {
- const corked = this.socket ? this.socket.writableCorked : 0;
- return corked + this[kCorked];
+ return this.socket ? this.socket.writableHighWaterMark : HIGH_WATER_MARK;
}
});
@@ -239,7 +263,6 @@ ObjectDefineProperty(OutgoingMessage.prototype, '_headerNames', {
}, 'OutgoingMessage.prototype._headerNames is deprecated', 'DEP0066')
});
-
OutgoingMessage.prototype._renderHeaders = function _renderHeaders() {
if (this._header) {
throw new ERR_HTTP_HEADERS_SENT('render');
@@ -261,6 +284,8 @@ OutgoingMessage.prototype._renderHeaders = function _renderHeaders() {
};
OutgoingMessage.prototype.cork = function() {
+ this._writableState.corked++;
+
if (this.socket) {
this.socket.cork();
} else {
@@ -269,6 +294,10 @@ OutgoingMessage.prototype.cork = function() {
};
OutgoingMessage.prototype.uncork = function() {
+ if (this._writableState.corked) {
+ this._writableState.corked--;
+ }
+
if (this.socket) {
this.socket.uncork();
} else if (this[kCorked]) {
@@ -277,7 +306,6 @@ OutgoingMessage.prototype.uncork = function() {
};
OutgoingMessage.prototype.setTimeout = function setTimeout(msecs, callback) {
-
if (callback) {
this.on('timeout', callback);
}
@@ -292,22 +320,48 @@ OutgoingMessage.prototype.setTimeout = function setTimeout(msecs, callback) {
return this;
};
+OutgoingMessage.prototype._construct = function(callback) {
+ if (this.socket) {
+ for (let n = 0; n < this[kCorked]; ++n) {
+ this.socket.cork();
+ }
+ callback();
+ } else {
+ // TODO(ronag): What if never assigned socket?
+ this.once('socket', function(socket) {
+ for (let n = 0; n < this[kCorked]; ++n) {
+ socket.cork();
+ }
+ callback();
+ });
+ }
+};
+
+OutgoingMessage.prototype.destroy = destroy;
-// It's possible that the socket will be destroyed, and removed from
-// any messages, before ever calling this. In that case, just skip
-// it, since something else is destroying this connection anyway.
-OutgoingMessage.prototype.destroy = function destroy(error) {
- if (this.destroyed) {
- return this;
+OutgoingMessage.prototype._destroy = function(err, callback) {
+ if (!this.writableEnded) {
+ this.aborted = true;
+ this.emit('aborted');
}
- this.destroyed = true;
+
+ // TODO(ronag): Why is this needed?
+ const cb = (err) => {
+ const triggerAsyncId = this.socket ?
+ this.socket[async_id_symbol] : undefined;
+ defaultTriggerAsyncIdScope(triggerAsyncId, callback, err);
+ };
if (this.socket) {
- this.socket.destroy(error);
- } else {
- this.once('socket', function socketDestroyOnConnect(socket) {
- socket.destroy(error);
+ Stream.finished(this.socket.destroy(err), (er) => {
+ if (er && er.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
+ err = er;
+ }
+
+ cb(err);
});
+ } else {
+ cb(err);
}
return this;
@@ -667,16 +721,6 @@ ObjectDefineProperty(OutgoingMessage.prototype, 'headersSent', {
get: function() { return !!this._header; }
});
-ObjectDefineProperty(OutgoingMessage.prototype, 'writableEnded', {
- get: function() { return this.finished; }
-});
-
-ObjectDefineProperty(OutgoingMessage.prototype, 'writableNeedDrain', {
- get: function() {
- return !this.destroyed && !this.finished && this[kNeedDrain];
- }
-});
-
const crlf_buf = Buffer.from('\r\n');
OutgoingMessage.prototype.write = function write(chunk, encoding, callback) {
if (typeof encoding === 'function') {
@@ -684,30 +728,15 @@ OutgoingMessage.prototype.write = function write(chunk, encoding, callback) {
encoding = null;
}
- const ret = write_(this, chunk, encoding, callback, false);
+ const ret = write_(this, chunk, encoding, callback, false) === true;
if (!ret)
- this[kNeedDrain] = true;
+ this._writableState.needDrain = true;
return ret;
};
-function onError(msg, err, callback) {
- const triggerAsyncId = msg.socket ? msg.socket[async_id_symbol] : undefined;
- defaultTriggerAsyncIdScope(triggerAsyncId,
- process.nextTick,
- emitErrorNt,
- msg,
- err,
- callback);
-}
-
-function emitErrorNt(msg, err, callback) {
- callback(err);
- if (typeof msg.emit === 'function' && !msg._closed) {
- msg.emit('error', err);
- }
-}
-
function write_(msg, chunk, encoding, callback, fromEnd) {
+ const state = msg._writableState;
+
if (typeof callback !== 'function')
callback = nop;
@@ -724,19 +753,16 @@ function write_(msg, chunk, encoding, callback, fromEnd) {
}
let err;
- if (msg.finished) {
+ if (state.ending) {
err = new ERR_STREAM_WRITE_AFTER_END();
- } else if (msg.destroyed) {
+ } else if (state.destroyed) {
err = new ERR_STREAM_DESTROYED('write');
}
if (err) {
- if (!msg.destroyed) {
- onError(msg, err, callback);
- } else {
- process.nextTick(callback, err);
- }
- return false;
+ process.nextTick(callback, err);
+ msg.destroy(err);
+ return err;
}
if (!msg._header) {
@@ -753,9 +779,9 @@ function write_(msg, chunk, encoding, callback, fromEnd) {
return true;
}
- if (!fromEnd && msg.socket && !msg.socket.writableCorked) {
- msg.socket.cork();
- process.nextTick(connectionCorkNT, msg.socket);
+ if (!fromEnd && !state.corked) {
+ msg.cork();
+ process.nextTick(uncorkNT, msg);
}
let ret;
@@ -773,8 +799,8 @@ function write_(msg, chunk, encoding, callback, fromEnd) {
}
-function connectionCorkNT(conn) {
- conn.uncork();
+function uncorkNT(msg) {
+ msg.uncork();
}
@@ -805,12 +831,24 @@ OutgoingMessage.prototype.addTrailers = function addTrailers(headers) {
}
};
-function onFinish(outmsg) {
- if (outmsg && outmsg.socket && outmsg.socket._hadError) return;
- outmsg.emit('finish');
+function onFinish(err) {
+ const state = this._writableState;
+
+ if (err || state.errored || state.finished) {
+ return;
+ }
+
+ state.finished = true;
+ this.emit('finish');
+
+ if (state.autoDestroy) {
+ this.destroy();
+ }
}
OutgoingMessage.prototype.end = function end(chunk, encoding, callback) {
+ const state = this._writableState;
+
if (typeof chunk === 'function') {
callback = chunk;
chunk = null;
@@ -820,74 +858,90 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) {
encoding = null;
}
+ this.cork();
+
+ let err;
+
if (chunk) {
- if (this.finished) {
- onError(this,
- new ERR_STREAM_WRITE_AFTER_END(),
- typeof callback !== 'function' ? nop : callback);
- return this;
+ const ret = write_(this, chunk, encoding, null, true);
+ if (ret instanceof Error) {
+ err = ret;
}
+ }
- if (this.socket) {
- this.socket.cork();
- }
+ if (err) {
+ // Do nothing...
+ } else if (!state.errored && !state.ending) {
+ state.ending = true;
- write_(this, chunk, encoding, null, true);
- } else if (this.finished) {
- if (typeof callback === 'function') {
- if (!this.writableFinished) {
- this.on('finish', callback);
- } else {
- callback(new ERR_STREAM_ALREADY_FINISHED('end'));
- }
- }
- return this;
- } else if (!this._header) {
- if (this.socket) {
- this.socket.cork();
+ if (!this._header) {
+ this._contentLength = 0;
+ this._implicitHeader();
}
- this._contentLength = 0;
- this._implicitHeader();
- }
+ const finish = FunctionPrototypeBind(onFinish, this);
- if (typeof callback === 'function')
- this.once('finish', callback);
+ state.finalCalled = true;
- const finish = FunctionPrototypeBind(onFinish, undefined, this);
+ if (this._hasBody && this.chunkedEncoding) {
+ this._send('0\r\n' + this._trailer + '\r\n', 'latin1', finish);
+ } else {
+ // Force a flush, HACK.
+ this._send('', 'latin1', finish);
+ }
- if (this._hasBody && this.chunkedEncoding) {
- this._send('0\r\n' + this._trailer + '\r\n', 'latin1', finish);
- } else {
- // Force a flush, HACK.
- this._send('', 'latin1', finish);
- }
+ while (state.corked) {
+ // Fully uncork connection on end().
+ this.uncork();
+ }
- if (this.socket) {
- // Fully uncork connection on end().
- this.socket._writableState.corked = 1;
- this.socket.uncork();
- }
- this[kCorked] = 0;
+ state.ended = true;
- this.finished = true;
+ // There is the first message on the outgoing queue, and we've sent
+ // everything to the socket.
+ debug('outgoing message end.');
+ if (this.outputData.length === 0 && this.socket?._httpMessage === this) {
+ this._finish();
+ }
+ } else if (state.finished) {
+ err = new ERR_STREAM_ALREADY_FINISHED('end');
+ } else if (state.destroyed) {
+ err = new ERR_STREAM_DESTROYED('end');
+ }
- // There is the first message on the outgoing queue, and we've sent
- // everything to the socket.
- debug('outgoing message end.');
- if (this.outputData.length === 0 &&
- this.socket &&
- this.socket._httpMessage === this) {
- this._finish();
+ if (typeof callback === 'function') {
+ if (err || state.finished) {
+ process.nextTick(callback, err);
+ } else {
+ onFinished(this, callback);
+ }
}
+ this.uncork();
+
return this;
};
+function onFinished(msg, callback) {
+ const onDone = (err) => {
+ msg
+ .off('finish', onDone)
+ .off(EE.errorMonitor, onDone);
+ callback(err);
+ };
+ msg
+ .on('finish', onDone)
+ .on(EE.errorMonitor, onDone);
+}
OutgoingMessage.prototype._finish = function _finish() {
- assert(this.socket);
- this.emit('prefinish');
+ const state = this._writableState;
+
+ if (!state.prefinished) {
+ assert(this.socket);
+ state.prefinished = true;
+ this.emit('prefinish');
+ }
};
@@ -920,19 +974,14 @@ OutgoingMessage.prototype._flush = function _flush() {
if (this.finished) {
// This is a queue to the server or client to bring in the next this.
this._finish();
- } else if (ret && this[kNeedDrain]) {
- this[kNeedDrain] = false;
+ } else if (ret && this._writableState.needDrain) {
+ this._writableState.needDrain = false;
this.emit('drain');
}
}
};
OutgoingMessage.prototype._flushOutput = function _flushOutput(socket) {
- while (this[kCorked]) {
- this[kCorked]--;
- socket.cork();
- }
-
const outputLength = this.outputData.length;
if (outputLength <= 0)
return undefined;
diff --git a/lib/_http_server.js b/lib/_http_server.js
index dac5fedf43..ff6ad0b189 100644
--- a/lib/_http_server.js
+++ b/lib/_http_server.js
@@ -59,7 +59,6 @@ const {
const { OutgoingMessage } = require('_http_outgoing');
const {
kOutHeaders,
- kNeedDrain,
emitStatistics
} = require('internal/http');
const {
@@ -227,11 +226,7 @@ function onServerResponseClose() {
// Ergo, we need to deal with stale 'close' events and handle the case
// where the ServerResponse object has already been deconstructed.
// Fortunately, that requires only a single if check. :-)
- if (this._httpMessage) {
- this._httpMessage.destroyed = true;
- this._httpMessage._closed = true;
- this._httpMessage.emit('close');
- }
+ this._httpMessage?.destroy();
}
ServerResponse.prototype.assignSocket = function assignSocket(socket) {
@@ -240,7 +235,6 @@ ServerResponse.prototype.assignSocket = function assignSocket(socket) {
socket.on('close', onServerResponseClose);
this.socket = socket;
this.emit('socket', socket);
- this._flush();
};
ServerResponse.prototype.detachSocket = function detachSocket(socket) {
@@ -556,8 +550,8 @@ function socketOnDrain(socket, state) {
}
const msg = socket._httpMessage;
- if (msg && !msg.finished && msg[kNeedDrain]) {
- msg[kNeedDrain] = false;
+ if (msg && !msg.finished && msg._writableState.needDrain) {
+ msg._writableState.needDrain = false;
msg.emit('drain');
}
}
@@ -584,7 +578,6 @@ function abortIncoming(incoming) {
const req = ArrayPrototypeShift(incoming);
req.destroy(connResetException('aborted'));
}
- // Abort socket._httpMessage ?
}
function socketOnEnd(server, socket, parser, state) {
@@ -804,7 +797,6 @@ function resOnFinish(req, res, socket, state, server) {
res.detachSocket(socket);
clearIncoming(req);
- process.nextTick(emitCloseNT, res);
if (res._last) {
if (typeof socket.destroySoon === 'function') {
@@ -826,12 +818,6 @@ function resOnFinish(req, res, socket, state, server) {
}
}
-function emitCloseNT(self) {
- self.destroyed = true;
- self._closed = true;
- self.emit('close');
-}
-
// The following callback is issued after the headers have been read on a
// new message. In this callback we setup the response object and pass it
// to the user.
diff --git a/lib/internal/http.js b/lib/internal/http.js
index f87dc8aa6c..28c4b245ab 100644
--- a/lib/internal/http.js
+++ b/lib/internal/http.js
@@ -44,7 +44,6 @@ function emitStatistics(statistics) {
module.exports = {
kOutHeaders: Symbol('kOutHeaders'),
- kNeedDrain: Symbol('kNeedDrain'),
utcDate,
emitStatistics
};
diff --git a/test/parallel/test-http-agent-keepalive.js b/test/parallel/test-http-agent-keepalive.js
index a1f902fab0..10c9e2372d 100644
--- a/test/parallel/test-http-agent-keepalive.js
+++ b/test/parallel/test-http-agent-keepalive.js
@@ -105,17 +105,20 @@ function remoteClose() {
function remoteError() {
// Remote server will destroy the socket
const req = get('/error', common.mustNotCall());
+ req.on('socket', common.mustCall((socket) => {
+ socket.on('end', common.mustCall(() => {
+ assert.strictEqual(agent.sockets[name].length, 1);
+ assert.strictEqual(agent.freeSockets[name], undefined);
+ }));
+ }));
req.on('error', common.mustCall((err) => {
assert(err);
assert.strictEqual(err.message, 'socket hang up');
- assert.strictEqual(agent.sockets[name].length, 1);
+ }));
+ req.on('close', common.mustCall((err) => {
+ assert.strictEqual(agent.sockets[name], undefined);
assert.strictEqual(agent.freeSockets[name], undefined);
- // Wait socket 'close' event emit
- setTimeout(common.mustCall(() => {
- assert.strictEqual(agent.sockets[name], undefined);
- assert.strictEqual(agent.freeSockets[name], undefined);
- server.close();
- }), common.platformTimeout(1));
+ server.close();
}));
}
diff --git a/test/parallel/test-http-outgoing-end-multiple.js b/test/parallel/test-http-outgoing-end-multiple.js
index 696443f939..8a3a42aa6c 100644
--- a/test/parallel/test-http-outgoing-end-multiple.js
+++ b/test/parallel/test-http-outgoing-end-multiple.js
@@ -3,23 +3,21 @@ const common = require('../common');
const assert = require('assert');
const http = require('http');
-const onWriteAfterEndError = common.mustCall((err) => {
- assert.strictEqual(err.code, 'ERR_STREAM_WRITE_AFTER_END');
-}, 2);
-
const server = http.createServer(common.mustCall(function(req, res) {
res.end('testing ended state', common.mustCall());
assert.strictEqual(res.writableCorked, 0);
res.end(common.mustCall((err) => {
- assert.strictEqual(err.code, 'ERR_STREAM_ALREADY_FINISHED');
+ assert.strictEqual(err.code, 'ERR_STREAM_WRITE_AFTER_END');
}));
- assert.strictEqual(res.writableCorked, 0);
- res.end('end', onWriteAfterEndError);
- assert.strictEqual(res.writableCorked, 0);
- res.on('error', onWriteAfterEndError);
- res.on('finish', common.mustCall(() => {
+ res.end('end', common.mustCall((err) => {
+ assert.strictEqual(err.code, 'ERR_STREAM_WRITE_AFTER_END');
+ }));
+ res.on('error', common.mustCall((err) => {
+ assert.strictEqual(err.code, 'ERR_STREAM_WRITE_AFTER_END');
+ }));
+ res.on('close', common.mustCall(() => {
res.end(common.mustCall((err) => {
- assert.strictEqual(err.code, 'ERR_STREAM_ALREADY_FINISHED');
+ assert.strictEqual(err.code, 'ERR_STREAM_DESTROYED');
server.close();
}));
}));
diff --git a/test/parallel/test-http-outgoing-message-capture-rejection.js b/test/parallel/test-http-outgoing-message-capture-rejection.js
index 9fe9bdb213..763f125181 100644
--- a/test/parallel/test-http-outgoing-message-capture-rejection.js
+++ b/test/parallel/test-http-outgoing-message-capture-rejection.js
@@ -14,7 +14,7 @@ events.captureRejections = true;
throw _err;
}));
- res.socket.on('error', common.mustCall((err) => {
+ res.on('error', common.mustCall((err) => {
assert.strictEqual(err, _err);
}));
diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js
index 86f9006d83..3ad8d82e78 100644
--- a/test/parallel/test-stream-pipeline.js
+++ b/test/parallel/test-stream-pipeline.js
@@ -254,40 +254,6 @@ const net = require('net');
}
{
- const server = http.createServer((req, res) => {
- pipeline(req, res, common.mustSucceed());
- });
-
- server.listen(0, () => {
- const req = http.request({
- port: server.address().port
- });
-
- let sent = 0;
- const rs = new Readable({
- read() {
- if (sent++ > 10) {
- return;
- }
- rs.push('hello');
- }
- });
-
- pipeline(rs, req, common.mustCall(() => {
- server.close();
- }));
-
- req.on('response', (res) => {
- let cnt = 10;
- res.on('data', () => {
- cnt--;
- if (cnt === 0) rs.destroy();
- });
- });
- });
-}
-
-{
const makeTransform = () => {
const tr = new Transform({
transform(data, enc, cb) {