summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorKevin Cherkauer <kevin.cherkauer@mongodb.com>2022-11-17 00:43:22 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-11-17 01:15:34 +0000
commit3fdd9c6c3e71e9be3478f840afab8eea93c2061c (patch)
tree8c02b1d2ff9d1ebf3ca958160f099790a5d40007 /src
parent6b46d34b665f34e7c0d4794d0f8964aa9e36b289 (diff)
downloadmongo-3fdd9c6c3e71e9be3478f840afab8eea93c2061c.tar.gz
SERVER-70392 Named Pipes _writeTestPipe shell function
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/storage/external_record_store_test.cpp22
-rw-r--r--src/mongo/db/storage/input_stream.h1
-rw-r--r--src/mongo/db/storage/named_pipe_posix.cpp11
-rw-r--r--src/mongo/db/storage/named_pipe_windows.cpp22
-rw-r--r--src/mongo/shell/SConscript2
-rw-r--r--src/mongo/shell/named_pipe_test_helper.cpp194
-rw-r--r--src/mongo/shell/named_pipe_test_helper.h64
-rw-r--r--src/mongo/shell/shell_utils_launcher.cpp147
8 files changed, 440 insertions, 23 deletions
diff --git a/src/mongo/db/storage/external_record_store_test.cpp b/src/mongo/db/storage/external_record_store_test.cpp
index 3ff696f02ab..135fbe7d43d 100644
--- a/src/mongo/db/storage/external_record_store_test.cpp
+++ b/src/mongo/db/storage/external_record_store_test.cpp
@@ -256,7 +256,7 @@ TEST_F(ExternalRecordStoreTest, NamedPipeMultiplePipes1) {
(ExternalDataSourceMetadata::kUrlProtocolFile + pipePaths[pipeIdx]),
StorageTypeEnum::pipe,
FileTypeEnum::bson);
- vopts.dataSources.push_back(meta);
+ vopts.dataSources.emplace_back(meta);
}
MultiBsonStreamCursor msbc = MultiBsonStreamCursor(vopts);
@@ -273,7 +273,7 @@ TEST_F(ExternalRecordStoreTest, NamedPipeMultiplePipes1) {
ASSERT_EQ(recIdExpected, recId)
<< "Expected record->id {} but got {}"_format(recIdExpected, recId);
ASSERT_EQ(record->data.size(), bsonObjs[pipeIdx][0].objsize())
- << "record.data.size() {} != original size {}"_format(
+ << "record->data.size() {} != original size {}"_format(
record->data.size(), bsonObjs[pipeIdx][0].objsize());
ASSERT_EQ(std::memcmp(record->data.data(),
bsonObjs[pipeIdx][0].objdata(),
@@ -357,7 +357,7 @@ TEST_F(ExternalRecordStoreTest, NamedPipeMultiplePipes2) {
(ExternalDataSourceMetadata::kUrlProtocolFile + pipePaths[pipeIdx]),
StorageTypeEnum::pipe,
FileTypeEnum::bson);
- vopts.dataSources.push_back(meta);
+ vopts.dataSources.emplace_back(meta);
}
MultiBsonStreamCursor msbc(vopts);
@@ -374,8 +374,8 @@ TEST_F(ExternalRecordStoreTest, NamedPipeMultiplePipes2) {
ASSERT_EQ(recIdExpected, recId)
<< "Expected record->id {} but got {}"_format(recIdExpected, recId);
ASSERT_EQ(record->data.size(), bsonObjs[objIdx].objsize())
- << "record.data.size() {} != original size {}"_format(record->data.size(),
- bsonObjs[objIdx].objsize());
+ << "record->data.size() {} != original size {}"_format(record->data.size(),
+ bsonObjs[objIdx].objsize());
ASSERT_EQ(std::memcmp(record->data.data(),
bsonObjs[objIdx].objdata(),
bsonObjs[objIdx].objsize()),
@@ -443,7 +443,7 @@ TEST_F(ExternalRecordStoreTest, NamedPipeMultiplePipes3) {
(ExternalDataSourceMetadata::kUrlProtocolFile + pipePaths[pipeIdx]),
StorageTypeEnum::pipe,
FileTypeEnum::bson);
- vopts.dataSources.push_back(meta);
+ vopts.dataSources.emplace_back(meta);
}
MultiBsonStreamCursor msbc(vopts);
@@ -459,8 +459,8 @@ TEST_F(ExternalRecordStoreTest, NamedPipeMultiplePipes3) {
ASSERT_EQ(recIdExpected, recId)
<< "Expected record->id {} but got {}"_format(recIdExpected, recId);
ASSERT_EQ(record->data.size(), bsonObjs[0].objsize())
- << "record.data.size() {} != original size {}"_format(record->data.size(),
- bsonObjs[0].objsize());
+ << "record->data.size() {} != original size {}"_format(record->data.size(),
+ bsonObjs[0].objsize());
ASSERT_EQ(
std::memcmp(record->data.data(), bsonObjs[0].objdata(), bsonObjs[0].objsize()), 0)
<< "Read data is not same as the source data";
@@ -496,7 +496,7 @@ TEST_F(ExternalRecordStoreTest, NamedPipeMultiplePipes4) {
objsWritten += numObjs;
std::string fieldName = "field_{}"_format(pipeIdx);
for (int objIdx = 0; objIdx < numObjs; ++objIdx) {
- pipeBsonObjs[pipeIdx].push_back(BSON(fieldName << getRandomString(rand() % 2048)));
+ pipeBsonObjs[pipeIdx].emplace_back(BSON(fieldName << getRandomString(rand() % 2048)));
}
}
@@ -527,7 +527,7 @@ TEST_F(ExternalRecordStoreTest, NamedPipeMultiplePipes4) {
(ExternalDataSourceMetadata::kUrlProtocolFile + pipePaths[pipeIdx]),
StorageTypeEnum::pipe,
FileTypeEnum::bson);
- vopts.dataSources.push_back(meta);
+ vopts.dataSources.emplace_back(meta);
}
MultiBsonStreamCursor msbc(vopts);
@@ -551,7 +551,7 @@ TEST_F(ExternalRecordStoreTest, NamedPipeMultiplePipes4) {
ASSERT_EQ(recIdExpected, recId)
<< "Expected record->id {} but got {}"_format(recIdExpected, recId);
ASSERT_EQ(record->data.size(), pipeBsonObjs[pipeIdx][pipeObjsRead - 1].objsize())
- << "record.data.size() {} != original size {}"_format(
+ << "record->data.size() {} != original size {}"_format(
record->data.size(), pipeBsonObjs[pipeIdx][pipeObjsRead - 1].objsize());
ASSERT_EQ(std::memcmp(record->data.data(),
pipeBsonObjs[pipeIdx][pipeObjsRead - 1].objdata(),
diff --git a/src/mongo/db/storage/input_stream.h b/src/mongo/db/storage/input_stream.h
index 8e5dcdf65cc..a1f9dbe502c 100644
--- a/src/mongo/db/storage/input_stream.h
+++ b/src/mongo/db/storage/input_stream.h
@@ -98,7 +98,6 @@ public:
// If we reach this point, we accumulated fewer than 'count' bytes.
if (MONGO_likely(InputT::isEof())) {
- LOGV2_INFO(7005001, "Named pipe is closed", "path"_attr = InputT::getAbsolutePath());
return nReadTotal;
}
diff --git a/src/mongo/db/storage/named_pipe_posix.cpp b/src/mongo/db/storage/named_pipe_posix.cpp
index 624c72b1db8..cf940dfe5e7 100644
--- a/src/mongo/db/storage/named_pipe_posix.cpp
+++ b/src/mongo/db/storage/named_pipe_posix.cpp
@@ -28,7 +28,8 @@
*/
#ifndef _WIN32
-#include "named_pipe.h"
+
+#include "mongo/db/storage/named_pipe.h"
#include <fmt/format.h>
#include <string>
@@ -91,6 +92,14 @@ NamedPipeInput::~NamedPipeInput() {
}
void NamedPipeInput::doOpen() {
+ // MultiBsonStreamCursor's (MBSC) assembly buffer is designed to perform well without a lower-
+ // layer IO buffer. Removing std::ifstream's default 8k "associated buffer" improves throughput
+ // by 1.9% by eliminating the hidden copies from that buffer to MBSC's buffer. MBSC itself will
+ // never copy data except when it (rarely) needs to expand its buffer, so by removing
+ // std::ifstream's buffer we get an essentially zero-copy cursor that still avoids lots of tiny
+ // IOs due to MBSC's assembly buffer algorithm.
+ _ifs.rdbuf()->pubsetbuf(0, 0);
+
// Retry open every 1 ms for up to 1 sec in case writer has not created the pipe yet.
int retries = 0;
do {
diff --git a/src/mongo/db/storage/named_pipe_windows.cpp b/src/mongo/db/storage/named_pipe_windows.cpp
index aa4d530cab9..4dd20cbc6bf 100644
--- a/src/mongo/db/storage/named_pipe_windows.cpp
+++ b/src/mongo/db/storage/named_pipe_windows.cpp
@@ -36,6 +36,7 @@
#include "mongo/db/storage/io_error_message.h"
#include "mongo/logv2/log.h"
+#include "mongo/stdx/thread.h"
#include "mongo/util/errno_util.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kStorage
@@ -123,14 +124,21 @@ NamedPipeInput::~NamedPipeInput() {
}
void NamedPipeInput::doOpen() {
- _pipe =
- CreateFileA(_pipeAbsolutePath.c_str(), GENERIC_READ, 0, nullptr, OPEN_EXISTING, 0, nullptr);
- if (_pipe == INVALID_HANDLE_VALUE) {
- return;
- }
+ // Retry open every 1 ms for up to 1 sec in case writer has not created the pipe yet.
+ int retries = 0;
+ do {
+ _pipe = CreateFileA(
+ _pipeAbsolutePath.c_str(), GENERIC_READ, 0, nullptr, OPEN_EXISTING, 0, nullptr);
+ if (_pipe == INVALID_HANDLE_VALUE) {
+ stdx::this_thread::sleep_for(stdx::chrono::milliseconds(1));
+ ++retries;
+ }
+ } while (_pipe == INVALID_HANDLE_VALUE && retries <= 1000);
- _isOpen = true;
- _isGood = true;
+ if (_pipe != INVALID_HANDLE_VALUE) {
+ _isOpen = true;
+ _isGood = true;
+ }
}
int NamedPipeInput::doRead(char* data, int size) {
diff --git a/src/mongo/shell/SConscript b/src/mongo/shell/SConscript
index d99de6a0ed6..1179246d35c 100644
--- a/src/mongo/shell/SConscript
+++ b/src/mongo/shell/SConscript
@@ -123,6 +123,7 @@ env.Library(
target='shell_utils',
source=[
'mongo-server.cpp',
+ 'named_pipe_test_helper.cpp',
'shell_options.cpp',
'shell_utils.cpp',
'shell_utils_extended.cpp',
@@ -143,6 +144,7 @@ env.Library(
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/bson/util/bson_column',
'$BUILD_DIR/mongo/db/auth/security_token',
+ '$BUILD_DIR/mongo/db/storage/record_store_base',
],
)
diff --git a/src/mongo/shell/named_pipe_test_helper.cpp b/src/mongo/shell/named_pipe_test_helper.cpp
new file mode 100644
index 00000000000..0603af11f8b
--- /dev/null
+++ b/src/mongo/shell/named_pipe_test_helper.cpp
@@ -0,0 +1,194 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/shell/named_pipe_test_helper.h"
+
+#include "mongo/bson/bsonmisc.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/catalog/virtual_collection_options.h"
+#include "mongo/db/storage/multi_bson_stream_cursor.h"
+#include "mongo/db/storage/named_pipe.h"
+#include "mongo/stdx/chrono.h"
+
+namespace mongo {
+/**
+ * Gets a string of 'length' 'a' chars as efficiently as possible.
+ */
+std::string NamedPipeHelper::getString(int length) {
+ return std::string(length, 'a');
+}
+
+/**
+ * Reads all BSON objects from all named pipes in 'pipeRelativePaths' and returns the following
+ * stats in a BSON object:
+ * {
+ * "objects": number of objects read,
+ * "time": { total time consumed in...
+ * "sec": seconds,
+ * "msec": milliseconds,
+ * "usec": microseconds,
+ * "nsec": nanoseconds,
+ * },
+ * "rate": { data processing rate in...
+ * "mbps": megabytes / second,
+ * "gbps": gigabytes / second,
+ * },
+ * "totalSize": { total size of all objects in
+ * "bytes": bytes,
+ * "kb": kilobytes,
+ * "mb": megabytes,
+ * "gb": gigabytes,
+ * }
+ * }
+ */
+BSONObj NamedPipeHelper::readFromPipes(const std::vector<std::string>& pipeRelativePaths) {
+ stdx::chrono::system_clock::time_point startTime = stdx::chrono::system_clock::now();
+ double objects = 0.0; // return stat
+ double totalSizeBytes = 0.0; // return stat
+
+ // Create metadata describing the pipes and a MultiBsonStreamCursor to read them.
+ VirtualCollectionOptions vopts;
+ for (const std::string& pipeRelativePath : pipeRelativePaths) {
+ ExternalDataSourceMetadata meta(
+ (ExternalDataSourceMetadata::kUrlProtocolFile + pipeRelativePath),
+ StorageTypeEnum::pipe,
+ FileTypeEnum::bson);
+ vopts.dataSources.emplace_back(meta);
+ }
+ MultiBsonStreamCursor msbc(vopts);
+
+ // Use MultiBsonStreamCursor to read the pipes.
+ boost::optional<Record> record = boost::none;
+ do {
+ record = msbc.next();
+ if (record) {
+ ++objects;
+ totalSizeBytes += record->data.size();
+ }
+ } while (record);
+ stdx::chrono::system_clock::time_point finishTime = stdx::chrono::system_clock::now();
+ auto duration = finishTime - startTime;
+
+ double sec = stdx::chrono::duration_cast<stdx::chrono::seconds>(duration).count();
+ double msec = stdx::chrono::duration_cast<stdx::chrono::milliseconds>(duration).count();
+ double usec = stdx::chrono::duration_cast<stdx::chrono::microseconds>(duration).count();
+ double nsec = stdx::chrono::duration_cast<stdx::chrono::nanoseconds>(duration).count();
+ double mbps = (totalSizeBytes / (1024.0 * 1024.0)) / (nsec / (1000.0 * 1000.0 * 1000.0));
+ double gbps = mbps / 1024.0;
+ return BSON("" << BSON("objects"
+ << objects << "time"
+ << BSON("sec" << sec << "msec" << msec << "usec" << usec << "nsec"
+ << nsec)
+ << "rate" << BSON("mbps" << mbps << "gbps" << gbps) << "totalSize"
+ << BSON("bytes" << totalSizeBytes << "kb" << (totalSizeBytes / 1024.0)
+ << "mb" << (totalSizeBytes / (1024.0 * 1024.0)) << "gb"
+ << (totalSizeBytes / (1024.0 * 1024.0 * 1024.0)))));
+}
+
+/**
+ * Synchronously writes 'objects' random BSON objects to named pipe 'pipeRelativePath'. The "string"
+ * field of these objects will have stringMinSize <= string.length() <= stringMaxSize. Note that
+ * the open() call itself will block until a pipe reader attaches to the same pipe. Absorbs
+ * exceptions because this is called by an async detached thread, so escaping exceptions will cause
+ * fuzzer tests to fail as its try blocks are only around the main thread.
+ */
+void NamedPipeHelper::writeToPipe(const std::string& pipeRelativePath,
+ long objects,
+ long stringMinSize,
+ long stringMaxSize) noexcept {
+ try {
+ NamedPipeOutput pipeWriter(pipeRelativePath); // producer
+
+ pipeWriter.open();
+ for (long obj = 0; obj < objects; ++obj) {
+ int length = std::rand() % (1 + stringMaxSize - stringMinSize) + stringMinSize;
+ BSONObj bsonObj{BSON("length" << length << "string" << getString(length))};
+ pipeWriter.write(bsonObj.objdata(), bsonObj.objsize());
+ }
+ pipeWriter.close();
+ } catch (const DBException& exc) {
+ std::cout << "NamedPipeHelper::writeToPipe caught exception: " << exc.toString()
+ << std::endl;
+ } catch (...) {
+ // absorb
+ }
+}
+
+/**
+ * Asynchronously writes 'objects' random BSON objects to named pipe 'pipeRelativePath'. The
+ * "string" field of these objects will have stringMinSize <= string.length() <= stringMaxSize.
+ */
+void NamedPipeHelper::writeToPipeAsync(const std::string& pipeRelativePath,
+ long objects,
+ long stringMinSize,
+ long stringMaxSize) {
+ stdx::thread thread(writeToPipe, pipeRelativePath, objects, stringMinSize, stringMaxSize);
+ thread.detach();
+}
+
+/**
+ * Synchronously writes 'objects' BSON objects round-robinned from 'bsonObjs' to named pipe
+ * 'pipeRelativePath'. Note that the open() call itself will block until a pipe reader attaches to
+ * the same pipe. Absorbs exceptions because this is called by an async detached thread, so escaping
+ * exceptions will cause fuzzer tests to fail as its try blocks are only around the main thread.
+ */
+void NamedPipeHelper::writeToPipeObjects(const std::string& pipeRelativePath,
+ long objects,
+ const std::vector<BSONObj>& bsonObjs) noexcept {
+ try {
+ const int kNumBsonObjs = bsonObjs.size();
+ NamedPipeOutput pipeWriter(pipeRelativePath); // producer
+
+ pipeWriter.open();
+ for (long obj = 0; obj < objects; ++obj) {
+ BSONObj bsonObj{bsonObjs[obj % kNumBsonObjs]};
+ pipeWriter.write(bsonObj.objdata(), bsonObj.objsize());
+ }
+ pipeWriter.close();
+ } catch (const DBException& exc) {
+ std::cout << "NamedPipeHelper::writeToPipeObjects caught exception: " << exc.toString()
+ << std::endl;
+ } catch (...) {
+ // absorb
+ }
+}
+
+/**
+ * Asynchronously writes 'objects' BSON objects round-robinned from 'bsonObjs' to named pipe
+ * 'pipeRelativePath'.
+ */
+void NamedPipeHelper::writeToPipeObjectsAsync(const std::string& pipeRelativePath,
+ long objects,
+ const std::vector<BSONObj>& bsonObjs) {
+ stdx::thread thread(
+ writeToPipeObjects, std::move(pipeRelativePath), objects, std::move(bsonObjs));
+ thread.detach();
+}
+} // namespace mongo
diff --git a/src/mongo/shell/named_pipe_test_helper.h b/src/mongo/shell/named_pipe_test_helper.h
new file mode 100644
index 00000000000..5da8990db3e
--- /dev/null
+++ b/src/mongo/shell/named_pipe_test_helper.h
@@ -0,0 +1,64 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/storage/named_pipe.h"
+
+#include <string>
+
+#include "mongo/bson/bsonelement.h"
+#include "mongo/stdx/thread.h"
+
+namespace mongo {
+/**
+ * This class supports writing and reading named pipes in the mongo test shell.
+ */
+class NamedPipeHelper {
+public:
+ static BSONObj readFromPipes(const std::vector<std::string>& pipeRelativePaths);
+ static void writeToPipeAsync(const std::string& pipeRelativePath,
+ long objects,
+ long stringMinSize,
+ long stringMaxSize);
+ static void writeToPipeObjectsAsync(const std::string& pipeRelativePath,
+ long objects,
+ const std::vector<BSONObj>& bsonObjs);
+
+private:
+ static std::string getString(int length);
+ static void writeToPipe(const std::string& pipeRelativePath,
+ long objects,
+ long stringMinSize,
+ long stringMaxSize) noexcept;
+ static void writeToPipeObjects(const std::string& pipeRelativePath,
+ long objects,
+ const std::vector<BSONObj>& bsonObjs) noexcept;
+};
+} // namespace mongo
diff --git a/src/mongo/shell/shell_utils_launcher.cpp b/src/mongo/shell/shell_utils_launcher.cpp
index c781e9dae05..173bb7a9b24 100644
--- a/src/mongo/shell/shell_utils_launcher.cpp
+++ b/src/mongo/shell/shell_utils_launcher.cpp
@@ -27,9 +27,6 @@
* it in the license file.
*/
-
-#include "mongo/platform/basic.h"
-
#include "mongo/shell/shell_utils_launcher.h"
#include <algorithm>
@@ -61,11 +58,14 @@
#include "mongo/base/environment_buffer.h"
#include "mongo/base/error_codes.h"
+#include "mongo/bson/bsonelement.h"
#include "mongo/bson/util/builder.h"
#include "mongo/client/dbclient_connection.h"
#include "mongo/db/traffic_reader.h"
#include "mongo/logv2/log.h"
+#include "mongo/platform/basic.h"
#include "mongo/scripting/engine.h"
+#include "mongo/shell/named_pipe_test_helper.h"
#include "mongo/shell/shell_options.h"
#include "mongo/shell/shell_utils.h"
#include "mongo/util/assert_util.h"
@@ -1262,6 +1262,124 @@ int KillMongoProgramInstances() {
return returnCode;
}
+/**
+ * Reads a set of test named pipes. 'args' BSONObj should contain one or more fields like:
+ * "0": string; relative path of the first pipe
+ * "1": string; relative path of the second pipe
+ * ...
+ * Any field names not sequentially numbered from 0 will be ignored.
+ */
+BSONObj ReadTestPipes(const BSONObj& args, void* unused) {
+ int fieldNum = 0; // next field name in numeric form
+ BSONElement pipePathElem; // next pipe relative path
+ std::vector<std::string> pipeRelativePaths; // all pipe relative paths
+
+ do {
+ pipePathElem = BSONElement(args.getField(std::to_string(fieldNum)));
+ if (pipePathElem.type() == BSONType::String) {
+ pipeRelativePaths.emplace_back(pipePathElem.str());
+ } else if (pipePathElem.type() != BSONType::EOO) {
+ uasserted(ErrorCodes::FailedToParse,
+ "Argument {} (pipe path) must be a string"_format(fieldNum));
+ }
+ ++fieldNum;
+ } while (pipePathElem.type() != BSONType::EOO);
+
+ if (pipeRelativePaths.size() > 0) {
+ return NamedPipeHelper::readFromPipes(pipeRelativePaths);
+ }
+ return {};
+}
+
+/**
+ * Writes a test named pipe of generated BSONobj's. 'args' BSONObj should contain fields:
+ * "0": string; relative path of the pipe
+ * "1": number; number of BSON objects to write to the pipe
+ * "2": OPTIONAL number; lower bound on size of "string" field in generated object (default 0)
+ * "3": OPTIONAL number; upper bound on size of "string" field in generated object (default 2048)
+ * capped at 16,750,000 (slightly less than BSON object maximum of 16 MB)
+ */
+BSONObj WriteTestPipe(const BSONObj& args, void* unused) {
+ const long kStringMaxSize = 16750000; // max allowed size for generated object's "string" field
+ BSONElement pipePathElem(args.getField("0"));
+ BSONElement objectsElem(args.getField("1"));
+ BSONElement stringMinSizeStr(args.getField("2"));
+ BSONElement stringMaxSizeStr(args.getField("3"));
+ long stringMinSize = 0; // default "string" field minimum size
+ long stringMaxSize = 2048; // default "string" field maximum size
+
+ uassert(ErrorCodes::FailedToParse,
+ "First argument (pipe path) must be a string",
+ pipePathElem.type() == BSONType::String);
+ uassert(ErrorCodes::FailedToParse,
+ "Second argument (number of objects) must be a number",
+ objectsElem.isNumber());
+ if (stringMinSizeStr.isNumber()) { // optional
+ stringMinSize = stringMinSizeStr.numberLong();
+ if (stringMinSize < 0) {
+ stringMinSize = 0;
+ }
+ if (stringMinSize > kStringMaxSize) {
+ stringMinSize = kStringMaxSize;
+ }
+ }
+ if (stringMaxSizeStr.isNumber()) { // optional
+ stringMaxSize = stringMaxSizeStr.numberLong();
+ if (stringMaxSize < 0) {
+ stringMaxSize = 0;
+ }
+ if (stringMaxSize > kStringMaxSize) {
+ stringMaxSize = kStringMaxSize;
+ }
+ }
+ uassert(ErrorCodes::FailedToParse,
+ "Third argument (string min size) must be <= fourth argument (string max size)",
+ stringMinSize <= stringMaxSize);
+
+ NamedPipeHelper::writeToPipeAsync(
+ pipePathElem.str(), objectsElem.numberLong(), stringMinSize, stringMaxSize);
+
+ return {};
+}
+
+
+/**
+ * Writes a test named pipe by round-robinning caller-provided objects to the pipe. 'args' BSONObj
+ * should contain fields:
+ * "0": string; relative path of the pipe
+ * "1": number; number of BSON objects to write to the pipe
+ * "2": BSONArray; array of objects to round-robin write to the pipe
+ */
+BSONObj WriteTestPipeObjects(const BSONObj& args, void* unused) {
+ BSONElement pipePathElem(args.getField("0"));
+ BSONElement objectsElem(args.getField("1"));
+ BSONElement bsonElems(args.getField("2"));
+
+ uassert(ErrorCodes::FailedToParse,
+ "First argument (pipe path) must be a string",
+ pipePathElem.type() == BSONType::String);
+ uassert(ErrorCodes::FailedToParse,
+ "Second argument (number of objects) must be a number",
+ objectsElem.isNumber());
+ uassert(ErrorCodes::FailedToParse,
+ "Third argument must be an array of objects to round-robin over",
+ bsonElems.type() == mongo::Array);
+
+ // Convert bsonElems into bsonObjs as the former are pointers into local stack memory that will
+ // become invalid when this method returns, but they are needed by the async writer thread.
+ std::vector<BSONElement> bsonElemsVector = bsonElems.Array();
+ std::vector<BSONObj> bsonObjs;
+ for (BSONElement bsonElem : bsonElemsVector) {
+ bsonObjs.emplace_back(bsonElem.Obj().getOwned());
+ }
+
+ // Write the pipe asynchronously.
+ NamedPipeHelper::writeToPipeObjectsAsync(
+ pipePathElem.str(), objectsElem.numberLong(), bsonObjs);
+
+ return {};
+}
+
std::vector<ProcessId> getRunningMongoChildProcessIds() {
std::vector<ProcessId> registeredPids, outPids;
registry.getRegisteredPids(registeredPids);
@@ -1308,6 +1426,26 @@ MongoProgramScope::~MongoProgramScope() {
DESTRUCTOR_GUARD(KillMongoProgramInstances(); ClearRawMongoProgramOutput(BSONObj(), nullptr))
}
+/**
+ * Defines (funcName, CallbackFunction) pairs where funcName becomes the name of a function in the
+ * mongo test shell and CallbackFunction is its C++ callback (handler). The callbacks must all have
+ * signatures like
+ * BSONObj CallbackFunction(const BSONObj& args, void* data)
+ * (contract from injectNative()), though nobody is using the data parameter at time of writing.
+ *
+ * The BSONObj they return must put the result into field "" such as
+ * return BSON("" << true);
+ * or
+ * return BSON("" << BSON("resultInfo1" << resultValue1 << "resultInfo2" << resultValue2));
+ *
+ * In the shell these are called like
+ * funcName(arg1, arg2, ...)
+ * for example
+ * _writeTestPipe("my_pipe_file", 1234)
+ * The args will come in as the BSONObj first parameter of the callback with fields named
+ * sequentially from "0", e.g. for the above:
+ * {"0": "my_pipe_file", "1": 1234}
+ */
void installShellUtilsLauncher(Scope& scope) {
scope.injectNative("_startMongoProgram", StartMongoProgram);
scope.injectNative("_runningMongoChildProcessIds", RunningMongoChildProcessIds);
@@ -1327,6 +1465,9 @@ void installShellUtilsLauncher(Scope& scope) {
scope.injectNative("copyDbpath", CopyDbpath);
scope.injectNative("convertTrafficRecordingToBSON", ConvertTrafficRecordingToBSON);
scope.injectNative("getFCVConstants", GetFCVConstants);
+ scope.injectNative("_readTestPipes", ReadTestPipes);
+ scope.injectNative("_writeTestPipe", WriteTestPipe);
+ scope.injectNative("_writeTestPipeObjects", WriteTestPipeObjects);
}
} // namespace shell_utils
} // namespace mongo