diff options
author | Jennifer Peshansky <jennifer.peshansky@mongodb.com> | 2022-11-03 16:13:20 +0000 |
---|---|---|
committer | Jennifer Peshansky <jennifer.peshansky@mongodb.com> | 2022-11-03 16:13:20 +0000 |
commit | e74d2910bbe76790ad131d53fee277829cd95982 (patch) | |
tree | cabe148764529c9623652374fbc36323a550cd44 /src/mongo/db/storage/external_record_store_test.cpp | |
parent | 280145e9940729480bb8a35453d4056afac87641 (diff) | |
parent | ba467f46cc1bc49965e1d72b541eff0cf1d7b22e (diff) | |
download | mongo-e74d2910bbe76790ad131d53fee277829cd95982.tar.gz |
Merge branch 'master' into jenniferpeshansky/SERVER-70854jenniferpeshansky/SERVER-70854
Diffstat (limited to 'src/mongo/db/storage/external_record_store_test.cpp')
-rw-r--r-- | src/mongo/db/storage/external_record_store_test.cpp | 122 |
1 files changed, 75 insertions, 47 deletions
diff --git a/src/mongo/db/storage/external_record_store_test.cpp b/src/mongo/db/storage/external_record_store_test.cpp index f4c42fb03ad..e5474f43133 100644 --- a/src/mongo/db/storage/external_record_store_test.cpp +++ b/src/mongo/db/storage/external_record_store_test.cpp @@ -32,11 +32,9 @@ #include <ctime> #include <fmt/format.h> -#include "mongo/db/storage/external_record_store.h" #include "mongo/db/storage/input_stream.h" #include "mongo/db/storage/multi_bson_stream_cursor.h" #include "mongo/db/storage/named_pipe.h" -#include "mongo/logv2/log.h" #include "mongo/stdx/thread.h" #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" @@ -45,21 +43,33 @@ namespace mongo { using namespace fmt::literals; -#ifndef _WIN32 -static const std::string pipePath1 = "/tmp/named_pipe1"; -static const std::string pipePath2 = "/tmp/named_pipe2"; -static const std::string nonExistingPath = "/tmp/non-existing"; -#else -// "//./pipe" is the required path start of all named pipes on Windows, where "//." is the -// abbreviation for the local server name and "/pipe" is a literal. (These also work with -// Windows-native backslashes instead of forward slashes.) -static const std::string pipePath1 = R"(//./pipe/named_pipe1)"; -static const std::string pipePath2 = R"(//./pipe/named_pipe2)"; -static const std::string nonExistingPath = R"(//./pipe/non-existing)"; -#endif +static const std::string pipePath1 = "named_pipe1"; +static const std::string pipePath2 = "named_pipe2"; +static const std::string nonExistingPath = "non-existing"; static constexpr int kNumPipes = 2; static const std::string pipePaths[kNumPipes] = {pipePath1, pipePath2}; +class PipeWaiter { +public: + void notify() { + { + stdx::unique_lock lk(m); + pipeCreated = true; + } + cv.notify_one(); + } + + void wait() { + stdx::unique_lock lk(m); + cv.wait(lk, [&] { return pipeCreated; }); + } + +private: + Mutex m; + stdx::condition_variable cv; + bool pipeCreated = false; +}; + class ExternalRecordStoreTest : public unittest::Test { public: // Gets a random string of 'count' length consisting of printable ASCII chars (32-126). @@ -74,11 +84,11 @@ public: return buf; } - static constexpr int kBufferSize = 1024; char _buffer[kBufferSize]; // buffer amply big enough to fit any BSONObj used in this test - static void createNamedPipe(const std::string& pipePath, + static void createNamedPipe(PipeWaiter* pw, + const std::string& pipePath, long numToWrite, const std::vector<BSONObj>& bsonObjs); @@ -88,13 +98,16 @@ public: }; // Creates a named pipe of BSON objects. +// pipeWaiter - synchronization for pipe creation // pipePath - file path for the named pipe // numToWrite - number of bsons to write to the pipe // bsonObjs - vector of bsons to write round-robin to the pipe -void ExternalRecordStoreTest::createNamedPipe(const std::string& pipePath, +void ExternalRecordStoreTest::createNamedPipe(PipeWaiter* pw, + const std::string& pipePath, long numToWrite, const std::vector<BSONObj>& bsonObjs) { - NamedPipeOutput pipeWriter(pipePath.c_str()); + NamedPipeOutput pipeWriter(pipePath); + pw->notify(); pipeWriter.open(); const int numObjs = bsonObjs.size(); @@ -110,8 +123,10 @@ void ExternalRecordStoreTest::createNamedPipe(const std::string& pipePath, TEST_F(ExternalRecordStoreTest, NamedPipeBasicRead) { auto srcBsonObj = BSON("a" << 1); auto count = srcBsonObj.objsize(); + PipeWaiter pw; stdx::thread producer([&] { - NamedPipeOutput pipeWriter(pipePath1.c_str()); + NamedPipeOutput pipeWriter(pipePath1); + pw.notify(); pipeWriter.open(); for (int i = 0; i < 100; ++i) { @@ -123,11 +138,11 @@ TEST_F(ExternalRecordStoreTest, NamedPipeBasicRead) { ON_BLOCK_EXIT([&] { producer.join(); }); // Gives some time to the producer so that it can initialize a named pipe. - stdx::this_thread::sleep_for(stdx::chrono::seconds(1)); + pw.wait(); - auto inputStream = std::make_unique<InputStream<NamedPipeInput>>(pipePath1.c_str()); + auto inputStream = InputStream<NamedPipeInput>(pipePath1); for (int i = 0; i < 100; ++i) { - int nRead = inputStream->readBytes(count, _buffer); + int nRead = inputStream.readBytes(count, _buffer); ASSERT_EQ(nRead, count) << "Failed to read data up to {} bytes"_format(count); ASSERT_EQ(std::memcmp(srcBsonObj.objdata(), _buffer, count), 0) << "Read data is not same as the source data"; @@ -137,8 +152,10 @@ TEST_F(ExternalRecordStoreTest, NamedPipeBasicRead) { TEST_F(ExternalRecordStoreTest, NamedPipeReadPartialData) { auto srcBsonObj = BSON("a" << 1); auto count = srcBsonObj.objsize(); + PipeWaiter pw; stdx::thread producer([&] { - NamedPipeOutput pipeWriter(pipePath1.c_str()); + NamedPipeOutput pipeWriter(pipePath1); + pw.notify(); pipeWriter.open(); pipeWriter.write(srcBsonObj.objdata(), count); pipeWriter.close(); @@ -146,11 +163,11 @@ TEST_F(ExternalRecordStoreTest, NamedPipeReadPartialData) { ON_BLOCK_EXIT([&] { producer.join(); }); // Gives some time to the producer so that it can initialize a named pipe. - stdx::this_thread::sleep_for(stdx::chrono::seconds(1)); + pw.wait(); - auto inputStream = std::make_unique<InputStream<NamedPipeInput>>(pipePath1.c_str()); - // Request more data than the pipe contains. Should only get the bytes it does contain. - int nRead = inputStream->readBytes(kBufferSize, _buffer); + auto inputStream = InputStream<NamedPipeInput>(pipePath1); + // Requests more data than the pipe contains. Should only get the bytes it does contain. + int nRead = inputStream.readBytes(kBufferSize, _buffer); ASSERT_EQ(nRead, count) << "Expected nRead == {} but got {}"_format(count, nRead); ASSERT_EQ(std::memcmp(srcBsonObj.objdata(), _buffer, count), 0) << "Read data is not same as the source data"; @@ -160,8 +177,10 @@ TEST_F(ExternalRecordStoreTest, NamedPipeReadUntilProducerDone) { auto srcBsonObj = BSON("a" << 1); auto count = srcBsonObj.objsize(); const auto nSent = std::rand() % 100; + PipeWaiter pw; stdx::thread producer([&] { - NamedPipeOutput pipeWriter(pipePath1.c_str()); + NamedPipeOutput pipeWriter(pipePath1); + pw.notify(); pipeWriter.open(); for (int i = 0; i < nSent; ++i) { @@ -173,12 +192,12 @@ TEST_F(ExternalRecordStoreTest, NamedPipeReadUntilProducerDone) { ON_BLOCK_EXIT([&] { producer.join(); }); // Gives some time to the producer so that it can initialize a named pipe. - stdx::this_thread::sleep_for(stdx::chrono::seconds(1)); + pw.wait(); - auto inputStream = std::make_unique<InputStream<NamedPipeInput>>(pipePath1.c_str()); + auto inputStream = InputStream<NamedPipeInput>(pipePath1); auto nReceived = 0; while (true) { - int nRead = inputStream->readBytes(count, _buffer); + int nRead = inputStream.readBytes(count, _buffer); if (nRead != count) { ASSERT_EQ(nRead, 0) << "Expected nRead == 0 for EOF but got something else {}"_format( nRead); @@ -195,7 +214,7 @@ TEST_F(ExternalRecordStoreTest, NamedPipeReadUntilProducerDone) { TEST_F(ExternalRecordStoreTest, NamedPipeOpenNonExisting) { ASSERT_THROWS_CODE( - [] { (void)std::make_unique<InputStream<NamedPipeInput>>(nonExistingPath.c_str()); }(), + [] { (void)std::make_unique<InputStream<NamedPipeInput>>(nonExistingPath); }(), DBException, ErrorCodes::FileNotOpen); } @@ -210,9 +229,10 @@ TEST_F(ExternalRecordStoreTest, NamedPipeMultiplePipes1) { // Create two pipes. The first has only "a" objects and the second has only "zed" objects. stdx::thread pipeThreads[kNumPipes]; + PipeWaiter pw[kNumPipes]; for (int pipeIdx = 0; pipeIdx < kNumPipes; ++pipeIdx) { - pipeThreads[pipeIdx] = - stdx::thread(createNamedPipe, pipePaths[pipeIdx], kObjsPerPipe, bsonObjs[pipeIdx]); + pipeThreads[pipeIdx] = stdx::thread( + createNamedPipe, &pw[pipeIdx], pipePaths[pipeIdx], kObjsPerPipe, bsonObjs[pipeIdx]); } ON_BLOCK_EXIT([&] { for (int pipeIdx = 0; pipeIdx < kNumPipes; ++pipeIdx) { @@ -221,7 +241,9 @@ TEST_F(ExternalRecordStoreTest, NamedPipeMultiplePipes1) { }); // Gives some time to the producers so they can initialize the named pipes. - stdx::this_thread::sleep_for(stdx::chrono::seconds(1)); + for (int pipeIdx = 0; pipeIdx < kNumPipes; ++pipeIdx) { + pw[pipeIdx].wait(); + } // Create metadata describing the pipes and a MultiBsonStreamCursor to read them. VirtualCollectionOptions vopts; @@ -302,12 +324,13 @@ TEST_F(ExternalRecordStoreTest, NamedPipeMultiplePipes2) { // so they will cause several wraps. The largish size of the objects makes it highly likely that // some reads will leave a partial object that must be completed on a later next() call. stdx::thread pipeThreads[kNumPipes]; + PipeWaiter pw[kNumPipes]; long numToWrites[] = {(3 * groupsIn32Mb * numObjs), (5 * groupsIn32Mb * numObjs)}; long numToWrite = 0; for (int pipeIdx = 0; pipeIdx < kNumPipes; ++pipeIdx) { - pipeThreads[pipeIdx] = - stdx::thread(createNamedPipe, pipePaths[pipeIdx], numToWrites[pipeIdx], bsonObjs); + pipeThreads[pipeIdx] = stdx::thread( + createNamedPipe, &pw[pipeIdx], pipePaths[pipeIdx], numToWrites[pipeIdx], bsonObjs); numToWrite += numToWrites[pipeIdx]; } ON_BLOCK_EXIT([&] { @@ -317,7 +340,9 @@ TEST_F(ExternalRecordStoreTest, NamedPipeMultiplePipes2) { }); // Gives some time to the producers so they can initialize the named pipes. - stdx::this_thread::sleep_for(stdx::chrono::seconds(1)); + for (int pipeIdx = 0; pipeIdx < kNumPipes; ++pipeIdx) { + pw[pipeIdx].wait(); + } // Create metadata describing the pipes and a MultiBsonStreamCursor to read them. VirtualCollectionOptions vopts; @@ -383,12 +408,13 @@ TEST_F(ExternalRecordStoreTest, NamedPipeMultiplePipes3) { // Create pipes with large bsons. stdx::thread pipeThreads[kNumPipes]; + PipeWaiter pw[kNumPipes]; long numToWrites[] = {19, 17}; long numToWrite = 0; for (int pipeIdx = 0; pipeIdx < kNumPipes; ++pipeIdx) { - pipeThreads[pipeIdx] = - stdx::thread(createNamedPipe, pipePaths[pipeIdx], numToWrites[pipeIdx], bsonObjs); + pipeThreads[pipeIdx] = stdx::thread( + createNamedPipe, &pw[pipeIdx], pipePaths[pipeIdx], numToWrites[pipeIdx], bsonObjs); numToWrite += numToWrites[pipeIdx]; } ON_BLOCK_EXIT([&] { @@ -398,7 +424,9 @@ TEST_F(ExternalRecordStoreTest, NamedPipeMultiplePipes3) { }); // Gives some time to the producers so they can initialize the named pipes. - stdx::this_thread::sleep_for(stdx::chrono::seconds(1)); + for (int pipeIdx = 0; pipeIdx < kNumPipes; ++pipeIdx) { + pw[pipeIdx].wait(); + } // Create metadata describing the pipes and a MultiBsonStreamCursor to read them. VirtualCollectionOptions vopts; @@ -450,6 +478,7 @@ TEST_F(ExternalRecordStoreTest, NamedPipeMultiplePipes4) { constexpr int kNumPipes = 100; // shadows the global std::string pipePaths[kNumPipes]; // shadows the global stdx::thread pipeThreads[kNumPipes]; // pipe producer threads + PipeWaiter pw[kNumPipes]; // pipe waiters std::vector<BSONObj> pipeBsonObjs[kNumPipes]; // vector of BSON objects for each pipe size_t objsWritten = 0; // number of objects written to all pipes @@ -465,12 +494,9 @@ TEST_F(ExternalRecordStoreTest, NamedPipeMultiplePipes4) { // Create the pipes. for (int pipeIdx = 0; pipeIdx < kNumPipes; ++pipeIdx) { -#ifndef _WIN32 - pipePaths[pipeIdx] = "/tmp/named_pipe{}"_format(pipeIdx); -#else - pipePaths[pipeIdx] = "//./pipe/named_pipe{}"_format(pipeIdx); -#endif + pipePaths[pipeIdx] = "named_pipe{}"_format(pipeIdx); pipeThreads[pipeIdx] = stdx::thread(createNamedPipe, + &pw[pipeIdx], pipePaths[pipeIdx], pipeBsonObjs[pipeIdx].size(), pipeBsonObjs[pipeIdx]); @@ -482,7 +508,9 @@ TEST_F(ExternalRecordStoreTest, NamedPipeMultiplePipes4) { }); // Gives some time to the producers so they can initialize the named pipes. - stdx::this_thread::sleep_for(stdx::chrono::seconds(1)); + for (int pipeIdx = 0; pipeIdx < kNumPipes; ++pipeIdx) { + pw[pipeIdx].wait(); + } // Create metadata describing the pipes and a MultiBsonStreamCursor to read them. VirtualCollectionOptions vopts; |