summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMathias Buus <mathiasbuus@gmail.com>2018-04-04 17:34:37 +0200
committerMathias Buus <mathiasbuus@gmail.com>2018-04-04 17:34:37 +0200
commite2f705555cbed3d30fb78430d146b3df21b8462b (patch)
tree0770342aabbb8d38982afbae617479840ff206bf
parent0d34d8f40231be88d0da80ef4b1d9126965062fb (diff)
downloadnode-new-add-pump-and-eos.tar.gz
-rw-r--r--lib/internal/streams/pump.js92
1 files changed, 92 insertions, 0 deletions
diff --git a/lib/internal/streams/pump.js b/lib/internal/streams/pump.js
new file mode 100644
index 0000000000..11a8bb8386
--- /dev/null
+++ b/lib/internal/streams/pump.js
@@ -0,0 +1,92 @@
+'use strict';
+
+// Ported from https://github.com/mafintosh/end-of-stream
+// with permission from the author, Mathias Buus (@mafintosh)
+
+const eos = require('./end-of-stream');
+const assert = require('assert');
+
+function once(callback) {
+ let called = false;
+ return function(err) {
+ if (called) return;
+ called = true;
+ callback(err);
+ };
+}
+
+function noop() {}
+
+function isFn(fn) {
+ return typeof fn === 'function';
+}
+
+function isRequest(stream) {
+ return stream.setHeader && isFn(stream.abort);
+}
+
+function destroyer(stream, reading, writing, callback) {
+ callback = once(callback);
+
+ let closed = false;
+ stream.on('close', () => {
+ closed = true;
+ });
+
+ eos(stream, { readable: reading, writable: writing }, (err) => {
+ if (err) return callback(err);
+ closed = true;
+ callback();
+ });
+
+ let destroyed = false;
+ return (err) => {
+ if (closed) return;
+ if (destroyed) return;
+ destroyed = true;
+
+ // request.destroy just do .end - .abort is what we want
+ if (isRequest(stream)) return stream.abort();
+ if (isFn(stream.destroy)) return stream.destroy();
+
+ callback(err || new Error('stream was destroyed'));
+ };
+}
+
+function call(fn) {
+ fn();
+}
+
+function pipe(from, to) {
+ return from.pipe(to);
+}
+
+function popCallback(streams) {
+ if (!streams.length) return noop;
+ if (typeof streams[streams.length - 1] !== 'function') return noop;
+ return streams.pop();
+}
+
+function pump(...streams) {
+ const callback = popCallback(streams);
+
+ if (Array.isArray(streams[0])) streams = streams[0];
+ assert(streams.length < 2, 'pump requires two streams per minimum');
+
+ let error;
+ const destroys = streams.map(function(stream, i) {
+ const reading = i < streams.length - 1;
+ const writing = i > 0;
+ return destroyer(stream, reading, writing, function(err) {
+ if (!error) error = err;
+ if (err) destroys.forEach(call);
+ if (reading) return;
+ destroys.forEach(call);
+ callback(error);
+ });
+ });
+
+ return streams.reduce(pipe);
+}
+
+module.exports = pump;