summaryrefslogtreecommitdiff
path: root/src/mongo/db/storage/external_record_store_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/storage/external_record_store_test.cpp')
-rw-r--r--src/mongo/db/storage/external_record_store_test.cpp122
1 files changed, 75 insertions, 47 deletions
diff --git a/src/mongo/db/storage/external_record_store_test.cpp b/src/mongo/db/storage/external_record_store_test.cpp
index f4c42fb03ad..e5474f43133 100644
--- a/src/mongo/db/storage/external_record_store_test.cpp
+++ b/src/mongo/db/storage/external_record_store_test.cpp
@@ -32,11 +32,9 @@
#include <ctime>
#include <fmt/format.h>
-#include "mongo/db/storage/external_record_store.h"
#include "mongo/db/storage/input_stream.h"
#include "mongo/db/storage/multi_bson_stream_cursor.h"
#include "mongo/db/storage/named_pipe.h"
-#include "mongo/logv2/log.h"
#include "mongo/stdx/thread.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
@@ -45,21 +43,33 @@
namespace mongo {
using namespace fmt::literals;
-#ifndef _WIN32
-static const std::string pipePath1 = "/tmp/named_pipe1";
-static const std::string pipePath2 = "/tmp/named_pipe2";
-static const std::string nonExistingPath = "/tmp/non-existing";
-#else
-// "//./pipe" is the required path start of all named pipes on Windows, where "//." is the
-// abbreviation for the local server name and "/pipe" is a literal. (These also work with
-// Windows-native backslashes instead of forward slashes.)
-static const std::string pipePath1 = R"(//./pipe/named_pipe1)";
-static const std::string pipePath2 = R"(//./pipe/named_pipe2)";
-static const std::string nonExistingPath = R"(//./pipe/non-existing)";
-#endif
+static const std::string pipePath1 = "named_pipe1";
+static const std::string pipePath2 = "named_pipe2";
+static const std::string nonExistingPath = "non-existing";
static constexpr int kNumPipes = 2;
static const std::string pipePaths[kNumPipes] = {pipePath1, pipePath2};
+class PipeWaiter {
+public:
+ void notify() {
+ {
+ stdx::unique_lock lk(m);
+ pipeCreated = true;
+ }
+ cv.notify_one();
+ }
+
+ void wait() {
+ stdx::unique_lock lk(m);
+ cv.wait(lk, [&] { return pipeCreated; });
+ }
+
+private:
+ Mutex m;
+ stdx::condition_variable cv;
+ bool pipeCreated = false;
+};
+
class ExternalRecordStoreTest : public unittest::Test {
public:
// Gets a random string of 'count' length consisting of printable ASCII chars (32-126).
@@ -74,11 +84,11 @@ public:
return buf;
}
-
static constexpr int kBufferSize = 1024;
char _buffer[kBufferSize]; // buffer amply big enough to fit any BSONObj used in this test
- static void createNamedPipe(const std::string& pipePath,
+ static void createNamedPipe(PipeWaiter* pw,
+ const std::string& pipePath,
long numToWrite,
const std::vector<BSONObj>& bsonObjs);
@@ -88,13 +98,16 @@ public:
};
// Creates a named pipe of BSON objects.
+// pipeWaiter - synchronization for pipe creation
// pipePath - file path for the named pipe
// numToWrite - number of bsons to write to the pipe
// bsonObjs - vector of bsons to write round-robin to the pipe
-void ExternalRecordStoreTest::createNamedPipe(const std::string& pipePath,
+void ExternalRecordStoreTest::createNamedPipe(PipeWaiter* pw,
+ const std::string& pipePath,
long numToWrite,
const std::vector<BSONObj>& bsonObjs) {
- NamedPipeOutput pipeWriter(pipePath.c_str());
+ NamedPipeOutput pipeWriter(pipePath);
+ pw->notify();
pipeWriter.open();
const int numObjs = bsonObjs.size();
@@ -110,8 +123,10 @@ void ExternalRecordStoreTest::createNamedPipe(const std::string& pipePath,
TEST_F(ExternalRecordStoreTest, NamedPipeBasicRead) {
auto srcBsonObj = BSON("a" << 1);
auto count = srcBsonObj.objsize();
+ PipeWaiter pw;
stdx::thread producer([&] {
- NamedPipeOutput pipeWriter(pipePath1.c_str());
+ NamedPipeOutput pipeWriter(pipePath1);
+ pw.notify();
pipeWriter.open();
for (int i = 0; i < 100; ++i) {
@@ -123,11 +138,11 @@ TEST_F(ExternalRecordStoreTest, NamedPipeBasicRead) {
ON_BLOCK_EXIT([&] { producer.join(); });
// Gives some time to the producer so that it can initialize a named pipe.
- stdx::this_thread::sleep_for(stdx::chrono::seconds(1));
+ pw.wait();
- auto inputStream = std::make_unique<InputStream<NamedPipeInput>>(pipePath1.c_str());
+ auto inputStream = InputStream<NamedPipeInput>(pipePath1);
for (int i = 0; i < 100; ++i) {
- int nRead = inputStream->readBytes(count, _buffer);
+ int nRead = inputStream.readBytes(count, _buffer);
ASSERT_EQ(nRead, count) << "Failed to read data up to {} bytes"_format(count);
ASSERT_EQ(std::memcmp(srcBsonObj.objdata(), _buffer, count), 0)
<< "Read data is not same as the source data";
@@ -137,8 +152,10 @@ TEST_F(ExternalRecordStoreTest, NamedPipeBasicRead) {
TEST_F(ExternalRecordStoreTest, NamedPipeReadPartialData) {
auto srcBsonObj = BSON("a" << 1);
auto count = srcBsonObj.objsize();
+ PipeWaiter pw;
stdx::thread producer([&] {
- NamedPipeOutput pipeWriter(pipePath1.c_str());
+ NamedPipeOutput pipeWriter(pipePath1);
+ pw.notify();
pipeWriter.open();
pipeWriter.write(srcBsonObj.objdata(), count);
pipeWriter.close();
@@ -146,11 +163,11 @@ TEST_F(ExternalRecordStoreTest, NamedPipeReadPartialData) {
ON_BLOCK_EXIT([&] { producer.join(); });
// Gives some time to the producer so that it can initialize a named pipe.
- stdx::this_thread::sleep_for(stdx::chrono::seconds(1));
+ pw.wait();
- auto inputStream = std::make_unique<InputStream<NamedPipeInput>>(pipePath1.c_str());
- // Request more data than the pipe contains. Should only get the bytes it does contain.
- int nRead = inputStream->readBytes(kBufferSize, _buffer);
+ auto inputStream = InputStream<NamedPipeInput>(pipePath1);
+ // Requests more data than the pipe contains. Should only get the bytes it does contain.
+ int nRead = inputStream.readBytes(kBufferSize, _buffer);
ASSERT_EQ(nRead, count) << "Expected nRead == {} but got {}"_format(count, nRead);
ASSERT_EQ(std::memcmp(srcBsonObj.objdata(), _buffer, count), 0)
<< "Read data is not same as the source data";
@@ -160,8 +177,10 @@ TEST_F(ExternalRecordStoreTest, NamedPipeReadUntilProducerDone) {
auto srcBsonObj = BSON("a" << 1);
auto count = srcBsonObj.objsize();
const auto nSent = std::rand() % 100;
+ PipeWaiter pw;
stdx::thread producer([&] {
- NamedPipeOutput pipeWriter(pipePath1.c_str());
+ NamedPipeOutput pipeWriter(pipePath1);
+ pw.notify();
pipeWriter.open();
for (int i = 0; i < nSent; ++i) {
@@ -173,12 +192,12 @@ TEST_F(ExternalRecordStoreTest, NamedPipeReadUntilProducerDone) {
ON_BLOCK_EXIT([&] { producer.join(); });
// Gives some time to the producer so that it can initialize a named pipe.
- stdx::this_thread::sleep_for(stdx::chrono::seconds(1));
+ pw.wait();
- auto inputStream = std::make_unique<InputStream<NamedPipeInput>>(pipePath1.c_str());
+ auto inputStream = InputStream<NamedPipeInput>(pipePath1);
auto nReceived = 0;
while (true) {
- int nRead = inputStream->readBytes(count, _buffer);
+ int nRead = inputStream.readBytes(count, _buffer);
if (nRead != count) {
ASSERT_EQ(nRead, 0) << "Expected nRead == 0 for EOF but got something else {}"_format(
nRead);
@@ -195,7 +214,7 @@ TEST_F(ExternalRecordStoreTest, NamedPipeReadUntilProducerDone) {
TEST_F(ExternalRecordStoreTest, NamedPipeOpenNonExisting) {
ASSERT_THROWS_CODE(
- [] { (void)std::make_unique<InputStream<NamedPipeInput>>(nonExistingPath.c_str()); }(),
+ [] { (void)std::make_unique<InputStream<NamedPipeInput>>(nonExistingPath); }(),
DBException,
ErrorCodes::FileNotOpen);
}
@@ -210,9 +229,10 @@ TEST_F(ExternalRecordStoreTest, NamedPipeMultiplePipes1) {
// Create two pipes. The first has only "a" objects and the second has only "zed" objects.
stdx::thread pipeThreads[kNumPipes];
+ PipeWaiter pw[kNumPipes];
for (int pipeIdx = 0; pipeIdx < kNumPipes; ++pipeIdx) {
- pipeThreads[pipeIdx] =
- stdx::thread(createNamedPipe, pipePaths[pipeIdx], kObjsPerPipe, bsonObjs[pipeIdx]);
+ pipeThreads[pipeIdx] = stdx::thread(
+ createNamedPipe, &pw[pipeIdx], pipePaths[pipeIdx], kObjsPerPipe, bsonObjs[pipeIdx]);
}
ON_BLOCK_EXIT([&] {
for (int pipeIdx = 0; pipeIdx < kNumPipes; ++pipeIdx) {
@@ -221,7 +241,9 @@ TEST_F(ExternalRecordStoreTest, NamedPipeMultiplePipes1) {
});
// Gives some time to the producers so they can initialize the named pipes.
- stdx::this_thread::sleep_for(stdx::chrono::seconds(1));
+ for (int pipeIdx = 0; pipeIdx < kNumPipes; ++pipeIdx) {
+ pw[pipeIdx].wait();
+ }
// Create metadata describing the pipes and a MultiBsonStreamCursor to read them.
VirtualCollectionOptions vopts;
@@ -302,12 +324,13 @@ TEST_F(ExternalRecordStoreTest, NamedPipeMultiplePipes2) {
// so they will cause several wraps. The largish size of the objects makes it highly likely that
// some reads will leave a partial object that must be completed on a later next() call.
stdx::thread pipeThreads[kNumPipes];
+ PipeWaiter pw[kNumPipes];
long numToWrites[] = {(3 * groupsIn32Mb * numObjs), (5 * groupsIn32Mb * numObjs)};
long numToWrite = 0;
for (int pipeIdx = 0; pipeIdx < kNumPipes; ++pipeIdx) {
- pipeThreads[pipeIdx] =
- stdx::thread(createNamedPipe, pipePaths[pipeIdx], numToWrites[pipeIdx], bsonObjs);
+ pipeThreads[pipeIdx] = stdx::thread(
+ createNamedPipe, &pw[pipeIdx], pipePaths[pipeIdx], numToWrites[pipeIdx], bsonObjs);
numToWrite += numToWrites[pipeIdx];
}
ON_BLOCK_EXIT([&] {
@@ -317,7 +340,9 @@ TEST_F(ExternalRecordStoreTest, NamedPipeMultiplePipes2) {
});
// Gives some time to the producers so they can initialize the named pipes.
- stdx::this_thread::sleep_for(stdx::chrono::seconds(1));
+ for (int pipeIdx = 0; pipeIdx < kNumPipes; ++pipeIdx) {
+ pw[pipeIdx].wait();
+ }
// Create metadata describing the pipes and a MultiBsonStreamCursor to read them.
VirtualCollectionOptions vopts;
@@ -383,12 +408,13 @@ TEST_F(ExternalRecordStoreTest, NamedPipeMultiplePipes3) {
// Create pipes with large bsons.
stdx::thread pipeThreads[kNumPipes];
+ PipeWaiter pw[kNumPipes];
long numToWrites[] = {19, 17};
long numToWrite = 0;
for (int pipeIdx = 0; pipeIdx < kNumPipes; ++pipeIdx) {
- pipeThreads[pipeIdx] =
- stdx::thread(createNamedPipe, pipePaths[pipeIdx], numToWrites[pipeIdx], bsonObjs);
+ pipeThreads[pipeIdx] = stdx::thread(
+ createNamedPipe, &pw[pipeIdx], pipePaths[pipeIdx], numToWrites[pipeIdx], bsonObjs);
numToWrite += numToWrites[pipeIdx];
}
ON_BLOCK_EXIT([&] {
@@ -398,7 +424,9 @@ TEST_F(ExternalRecordStoreTest, NamedPipeMultiplePipes3) {
});
// Gives some time to the producers so they can initialize the named pipes.
- stdx::this_thread::sleep_for(stdx::chrono::seconds(1));
+ for (int pipeIdx = 0; pipeIdx < kNumPipes; ++pipeIdx) {
+ pw[pipeIdx].wait();
+ }
// Create metadata describing the pipes and a MultiBsonStreamCursor to read them.
VirtualCollectionOptions vopts;
@@ -450,6 +478,7 @@ TEST_F(ExternalRecordStoreTest, NamedPipeMultiplePipes4) {
constexpr int kNumPipes = 100; // shadows the global
std::string pipePaths[kNumPipes]; // shadows the global
stdx::thread pipeThreads[kNumPipes]; // pipe producer threads
+ PipeWaiter pw[kNumPipes]; // pipe waiters
std::vector<BSONObj> pipeBsonObjs[kNumPipes]; // vector of BSON objects for each pipe
size_t objsWritten = 0; // number of objects written to all pipes
@@ -465,12 +494,9 @@ TEST_F(ExternalRecordStoreTest, NamedPipeMultiplePipes4) {
// Create the pipes.
for (int pipeIdx = 0; pipeIdx < kNumPipes; ++pipeIdx) {
-#ifndef _WIN32
- pipePaths[pipeIdx] = "/tmp/named_pipe{}"_format(pipeIdx);
-#else
- pipePaths[pipeIdx] = "//./pipe/named_pipe{}"_format(pipeIdx);
-#endif
+ pipePaths[pipeIdx] = "named_pipe{}"_format(pipeIdx);
pipeThreads[pipeIdx] = stdx::thread(createNamedPipe,
+ &pw[pipeIdx],
pipePaths[pipeIdx],
pipeBsonObjs[pipeIdx].size(),
pipeBsonObjs[pipeIdx]);
@@ -482,7 +508,9 @@ TEST_F(ExternalRecordStoreTest, NamedPipeMultiplePipes4) {
});
// Gives some time to the producers so they can initialize the named pipes.
- stdx::this_thread::sleep_for(stdx::chrono::seconds(1));
+ for (int pipeIdx = 0; pipeIdx < kNumPipes; ++pipeIdx) {
+ pw[pipeIdx].wait();
+ }
// Create metadata describing the pipes and a MultiBsonStreamCursor to read them.
VirtualCollectionOptions vopts;