diff options
Diffstat (limited to 'src/mongo/db/repl/collection_cloner_test.cpp')
-rw-r--r-- | src/mongo/db/repl/collection_cloner_test.cpp | 779 |
1 files changed, 317 insertions, 462 deletions
diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp index 726a2b6a2cf..d302ed0492d 100644 --- a/src/mongo/db/repl/collection_cloner_test.cpp +++ b/src/mongo/db/repl/collection_cloner_test.cpp @@ -37,6 +37,7 @@ #include "mongo/db/repl/collection_cloner.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/repl/storage_interface_mock.h" +#include "mongo/dbtests/mock/mock_dbclient_connection.h" #include "mongo/stdx/memory.h" #include "mongo/unittest/task_executor_proxy.h" #include "mongo/unittest/unittest.h" @@ -58,6 +59,134 @@ public: } }; +class FailableMockDBClientConnection : public MockDBClientConnection { +public: + FailableMockDBClientConnection(MockRemoteDBServer* remote, executor::NetworkInterfaceMock* net) + : MockDBClientConnection(remote), _net(net) {} + + virtual ~FailableMockDBClientConnection() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + _paused = false; + _cond.notify_all(); + _cond.wait(lk, [this] { return !_resuming; }); + } + + Status connect(const HostAndPort& host, StringData applicationName) override { + if (!_failureForConnect.isOK()) + return _failureForConnect; + return MockDBClientConnection::connect(host, applicationName); + } + + using MockDBClientConnection::query; // This avoids warnings from -Woverloaded-virtual + unsigned long long query(stdx::function<void(mongo::DBClientCursorBatchIterator&)> f, + const NamespaceStringOrUUID& nsOrUuid, + mongo::Query query, + const mongo::BSONObj* fieldsToReturn, + int queryOptions, + int batchSize) override { + ON_BLOCK_EXIT([this]() { + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _queryCount++; + } + _cond.notify_all(); + }); + { + stdx::unique_lock<stdx::mutex> lk(_mutex); + _waiting = _paused; + _cond.notify_all(); + while (_paused) { + lk.unlock(); + _net->waitForWork(); + lk.lock(); + } + _waiting = false; + } + auto result = MockDBClientConnection::query( + f, nsOrUuid, query, fieldsToReturn, queryOptions, batchSize); + uassertStatusOK(_failureForQuery); + return result; + } + + void setFailureForConnect(Status failure) { + _failureForConnect = failure; + } + + void setFailureForQuery(Status failure) { + _failureForQuery = failure; + } + + void pause() { + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _paused = true; + } + _cond.notify_all(); + } + void resume() { + { + stdx::unique_lock<stdx::mutex> lk(_mutex); + _resuming = true; + _paused = false; + _resumedQueryCount = _queryCount; + while (_waiting) { + lk.unlock(); + _net->signalWorkAvailable(); + mongo::sleepmillis(10); + lk.lock(); + } + _resuming = false; + _cond.notify_all(); + } + } + + // Waits for the next query after pause() is called to start. + void waitForPausedQuery() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + _cond.wait(lk, [this] { return _waiting; }); + } + + // Waits for the next query to run after resume() is called to complete. + void waitForResumedQuery() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + _cond.wait(lk, [this] { return _resumedQueryCount != _queryCount; }); + } + +private: + executor::NetworkInterfaceMock* _net; + stdx::mutex _mutex; + stdx::condition_variable _cond; + bool _paused = false; + bool _waiting = false; + bool _resuming = false; + int _queryCount = 0; + int _resumedQueryCount = 0; + Status _failureForConnect = Status::OK(); + Status _failureForQuery = Status::OK(); +}; + +// RAII class to pause the client; since tests are very exception-heavy this prevents them +// from hanging on failure. +class MockClientPauser { + MONGO_DISALLOW_COPYING(MockClientPauser); + +public: + MockClientPauser(FailableMockDBClientConnection* client) : _client(client) { + _client->pause(); + }; + ~MockClientPauser() { + resume(); + } + void resume() { + if (_client) + _client->resume(); + _client = nullptr; + } + +private: + FailableMockDBClientConnection* _client; +}; + class CollectionClonerTest : public BaseClonerTest { public: BaseCloner* getCloner() const override; @@ -70,6 +199,17 @@ protected: void setUp() override; void tearDown() override; + virtual CollectionOptions getCollectionOptions() const { + CollectionOptions options; + options.uuid = UUID::gen(); + return options; + } + + virtual const NamespaceString& getStartupNss() const { + return nss; + }; + + std::vector<BSONObj> makeSecondaryIndexSpecs(const NamespaceString& nss); // A simple arbitrary value to use as the default batch size. @@ -79,16 +219,19 @@ protected: std::unique_ptr<CollectionCloner> collectionCloner; CollectionMockStats collectionStats; // Used by the _loader. CollectionBulkLoaderMock* _loader; // Owned by CollectionCloner. + bool _clientCreated = false; + FailableMockDBClientConnection* _client; // owned by the CollectionCloner once created. + std::unique_ptr<MockRemoteDBServer> _server; }; void CollectionClonerTest::setUp() { BaseClonerTest::setUp(); - options = {}; + options = getCollectionOptions(); collectionCloner.reset(nullptr); collectionCloner = stdx::make_unique<CollectionCloner>(&getExecutor(), dbWorkThreadPool.get(), target, - nss, + getStartupNss(), options, setStatusCallback(), storageInterface.get(), @@ -106,6 +249,13 @@ void CollectionClonerTest::setUp() { return StatusWith<std::unique_ptr<CollectionBulkLoader>>( std::unique_ptr<CollectionBulkLoader>(_loader)); }; + _server = stdx::make_unique<MockRemoteDBServer>(target.toString()); + _server->assignCollectionUuid(nss.ns(), *options.uuid); + _client = new FailableMockDBClientConnection(_server.get(), getNet()); + collectionCloner->setCreateClientFn_forTest([this]() { + _clientCreated = true; + return std::unique_ptr<DBClientConnection>(_client); + }); } // Return index specs to use for secondary indexes. @@ -123,7 +273,11 @@ std::vector<BSONObj> CollectionClonerTest::makeSecondaryIndexSpecs(const Namespa void CollectionClonerTest::tearDown() { BaseClonerTest::tearDown(); // Executor may still invoke collection cloner's callback before shutting down. - collectionCloner.reset(nullptr); + collectionCloner.reset(); + if (!_clientCreated) + delete _client; + _clientCreated = false; + _server.reset(); options = {}; } @@ -180,6 +334,19 @@ TEST_F(CollectionClonerTest, InvalidConstruction) { "'storageEngine.storageEngine1' has to be an embedded document."); } + // UUID must be present. + { + CollectionOptions invalidOptions = options; + invalidOptions.uuid = boost::none; + StorageInterface* si = storageInterface.get(); + ASSERT_THROWS_CODE_AND_WHAT( + CollectionCloner( + &executor, pool, target, nss, invalidOptions, cb, si, defaultBatchSize), + AssertionException, + 50953, + "Missing collection UUID in CollectionCloner, collection name: db.coll"); + } + // Callback function cannot be null. { CollectionCloner::CallbackFn nullCb; @@ -190,6 +357,17 @@ TEST_F(CollectionClonerTest, InvalidConstruction) { ErrorCodes::BadValue, "callback function cannot be null"); } + + // Batch size must be non-negative. + { + StorageInterface* si = storageInterface.get(); + constexpr int kInvalidBatchSize = -1; + ASSERT_THROWS_CODE_AND_WHAT( + CollectionCloner(&executor, pool, target, nss, options, cb, si, kInvalidBatchSize), + AssertionException, + 50954, + "collectionClonerBatchSize must be non-negative."); + } } TEST_F(CollectionClonerTest, ClonerLifeCycle) { @@ -206,7 +384,9 @@ TEST_F(CollectionClonerTest, FirstRemoteCommand) { auto&& noiRequest = noi->getRequest(); ASSERT_EQUALS(nss.db().toString(), noiRequest.dbname); ASSERT_EQUALS("count", std::string(noiRequest.cmdObj.firstElementFieldName())); - ASSERT_EQUALS(nss.coll().toString(), noiRequest.cmdObj.firstElement().valuestrsafe()); + auto requestUUID = uassertStatusOK(UUID::parse(noiRequest.cmdObj.firstElement())); + ASSERT_EQUALS(options.uuid.get(), requestUUID); + ASSERT_FALSE(net->hasReadyRequests()); ASSERT_TRUE(collectionCloner->isActive()); } @@ -347,18 +527,16 @@ TEST_F(CollectionClonerTest, ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus()); } -TEST_F(CollectionClonerTest, DoNotCreateIDIndexIfAutoIndexIdUsed) { - options = {}; - options.autoIndexId = CollectionOptions::NO; - collectionCloner.reset(new CollectionCloner(&getExecutor(), - dbWorkThreadPool.get(), - target, - nss, - options, - setStatusCallback(), - storageInterface.get(), - defaultBatchSize)); +class CollectionClonerNoAutoIndexTest : public CollectionClonerTest { +protected: + CollectionOptions getCollectionOptions() const override { + CollectionOptions options = CollectionClonerTest::getCollectionOptions(); + options.autoIndexId = CollectionOptions::NO; + return options; + } +}; +TEST_F(CollectionClonerNoAutoIndexTest, DoNotCreateIDIndexIfAutoIndexIdUsed) { NamespaceString collNss; CollectionOptions collOptions; std::vector<BSONObj> collIndexSpecs{BSON("fakeindexkeys" << 1)}; // init with one doc. @@ -376,32 +554,24 @@ TEST_F(CollectionClonerTest, DoNotCreateIDIndexIfAutoIndexIdUsed) { return std::unique_ptr<CollectionBulkLoader>(loader); }; + const BSONObj doc = BSON("_id" << 1); + _server->insert(nss.ns(), doc); + // Pause the CollectionCloner before executing the query so we can verify events which are + // supposed to happen before the query. + MockClientPauser pauser(_client); ASSERT_OK(collectionCloner->startup()); { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCountResponse(0)); + processNetworkResponse(createCountResponse(1)); processNetworkResponse(createListIndexesResponse(0, BSONArray())); } ASSERT_TRUE(collectionCloner->isActive()); - collectionCloner->waitForDbWorker(); + _client->waitForPausedQuery(); ASSERT_TRUE(collectionCloner->isActive()); ASSERT_TRUE(collectionStats.initCalled); - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, emptyArray)); - } - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - - const BSONObj doc = BSON("_id" << 1); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc))); - } + pauser.resume(); collectionCloner->join(); ASSERT_EQUALS(1, collectionStats.insertCount); ASSERT_TRUE(collectionStats.commitCalled); @@ -595,6 +765,9 @@ TEST_F(CollectionClonerTest, BeginCollectionFailed) { } TEST_F(CollectionClonerTest, BeginCollection) { + // Pause the CollectionCloner before executing the query so we can verify state after + // the listIndexes call. + MockClientPauser pauser(_client); ASSERT_OK(collectionCloner->startup()); CollectionMockStats stats; @@ -682,7 +855,10 @@ TEST_F(CollectionClonerTest, FindFetcherScheduleFailed) { ASSERT_FALSE(collectionCloner->isActive()); } -TEST_F(CollectionClonerTest, FindCommandAfterBeginCollection) { +TEST_F(CollectionClonerTest, QueryAfterCreateCollection) { + // Pause the CollectionCloner before executing the query so we can verify the collection is + // created before the query. + MockClientPauser pauser(_client); ASSERT_OK(collectionCloner->startup()); CollectionMockStats stats; @@ -705,21 +881,15 @@ TEST_F(CollectionClonerTest, FindCommandAfterBeginCollection) { collectionCloner->waitForDbWorker(); ASSERT_TRUE(collectionCreated); - - // Fetcher should be scheduled after cloner creates collection. - auto net = getNet(); - executor::NetworkInterfaceMock::InNetworkGuard guard(net); - ASSERT_TRUE(net->hasReadyRequests()); - NetworkOperationIterator noi = net->getNextReadyRequest(); - auto&& noiRequest = noi->getRequest(); - ASSERT_EQUALS(nss.db().toString(), noiRequest.dbname); - ASSERT_EQUALS("find", std::string(noiRequest.cmdObj.firstElementFieldName())); - ASSERT_EQUALS(nss.coll().toString(), noiRequest.cmdObj.firstElement().valuestrsafe()); - ASSERT_TRUE(noiRequest.cmdObj.getField("noCursorTimeout").trueValue()); - ASSERT_FALSE(net->hasReadyRequests()); + // Make sure the query starts. + _client->waitForPausedQuery(); } -TEST_F(CollectionClonerTest, EstablishCursorCommandFailed) { +TEST_F(CollectionClonerTest, QueryFailed) { + // For this test to work properly, the error cannot be one of the special codes + // (OperationFailed or CursorNotFound) which trigger an attempt to see if the collection + // was deleted. + _client->setFailureForQuery({ErrorCodes::UnknownError, "QueryFailedTest UnknownError"}); ASSERT_OK(collectionCloner->startup()); { @@ -727,98 +897,22 @@ TEST_F(CollectionClonerTest, EstablishCursorCommandFailed) { processNetworkResponse(createCountResponse(0)); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); } - ASSERT_TRUE(collectionCloner->isActive()); - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(BSON("ok" << 0 << "errmsg" - << "" - << "code" - << ErrorCodes::CursorNotFound)); - } - - ASSERT_EQUALS(ErrorCodes::CursorNotFound, getStatus().code()); - ASSERT_FALSE(collectionCloner->isActive()); -} - -TEST_F(CollectionClonerTest, CollectionClonerResendsFindCommandOnRetriableError) { - ASSERT_OK(collectionCloner->startup()); - - auto net = getNet(); - executor::NetworkInterfaceMock::InNetworkGuard guard(net); - - // CollectionCollection sends listIndexes request irrespective of collection size in a - // successful count response. - assertRemoteCommandNameEquals("count", net->scheduleSuccessfulResponse(createCountResponse(0))); - net->runReadyNetworkOperations(); - - // CollectionCloner requires a successful listIndexes response in order to send the find request - // for the documents in the collection. - assertRemoteCommandNameEquals( - "listIndexes", - net->scheduleSuccessfulResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)))); - net->runReadyNetworkOperations(); - - // Respond to the find request with a retriable error. - assertRemoteCommandNameEquals("find", - net->scheduleErrorResponse(Status(ErrorCodes::HostNotFound, ""))); - net->runReadyNetworkOperations(); - ASSERT_TRUE(collectionCloner->isActive()); - - // This check exists to ensure that the command used to establish the cursors is retried, - // regardless of the command format. - auto noi = net->getNextReadyRequest(); - assertRemoteCommandNameEquals("find", noi->getRequest()); - net->blackHole(noi); -} - -TEST_F(CollectionClonerTest, EstablishCursorCommandCanceled) { - ASSERT_OK(collectionCloner->startup()); - - ASSERT_TRUE(collectionCloner->isActive()); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCountResponse(0)); - scheduleNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); - } - ASSERT_TRUE(collectionCloner->isActive()); - - auto net = getNet(); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - - net->runReadyNetworkOperations(); - } - - collectionCloner->waitForDbWorker(); - - ASSERT_TRUE(collectionCloner->isActive()); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - scheduleNetworkResponse(BSON("ok" << 1)); - } - ASSERT_TRUE(collectionCloner->isActive()); - - collectionCloner->shutdown(); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - getNet()->logQueues(); - net->runReadyNetworkOperations(); - } - - ASSERT_EQUALS(ErrorCodes::CallbackCanceled, getStatus().code()); + collectionCloner->join(); + ASSERT_EQUALS(ErrorCodes::UnknownError, getStatus().code()); ASSERT_FALSE(collectionCloner->isActive()); } TEST_F(CollectionClonerTest, InsertDocumentsScheduleDbWorkFailed) { + // Set up documents to be returned from upstream node. + _server->insert(nss.ns(), BSON("_id" << 1)); + + // Pause the client so we can set up the failure. + MockClientPauser pauser(_client); ASSERT_OK(collectionCloner->startup()); { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCountResponse(0)); + processNetworkResponse(createCountResponse(1)); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); } @@ -831,31 +925,24 @@ TEST_F(CollectionClonerTest, InsertDocumentsScheduleDbWorkFailed) { return StatusWith<executor::TaskExecutor::CallbackHandle>(ErrorCodes::UnknownError, ""); }); - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, emptyArray)); - } - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - - const BSONObj doc = BSON("_id" << 1); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc))); - } + pauser.resume(); + collectionCloner->join(); ASSERT_EQUALS(ErrorCodes::UnknownError, getStatus().code()); ASSERT_FALSE(collectionCloner->isActive()); } TEST_F(CollectionClonerTest, InsertDocumentsCallbackCanceled) { + // Set up documents to be returned from upstream node. + _server->insert(nss.ns(), BSON("_id" << 1)); + + // Pause the client so we can set up the failure. + MockClientPauser pauser(_client); ASSERT_OK(collectionCloner->startup()); { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCountResponse(0)); + processNetworkResponse(createCountResponse(1)); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); } @@ -874,37 +961,30 @@ TEST_F(CollectionClonerTest, InsertDocumentsCallbackCanceled) { return StatusWith<executor::TaskExecutor::CallbackHandle>(handle); }); - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, emptyArray)); - } - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(BSON("_id" << 1)))); - } + pauser.resume(); collectionCloner->join(); ASSERT_EQUALS(ErrorCodes::CallbackCanceled, getStatus().code()); ASSERT_FALSE(collectionCloner->isActive()); } TEST_F(CollectionClonerTest, InsertDocumentsFailed) { + // Set up documents to be returned from upstream node. + _server->insert(nss.ns(), BSON("_id" << 1)); + + // Pause the client so we can set up the failure. + MockClientPauser pauser(_client); ASSERT_OK(collectionCloner->startup()); ASSERT_TRUE(collectionCloner->isActive()); { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCountResponse(0)); + processNetworkResponse(createCountResponse(1)); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); } ASSERT_TRUE(collectionCloner->isActive()); getNet()->logQueues(); - collectionCloner->waitForDbWorker(); + _client->waitForPausedQuery(); ASSERT_TRUE(collectionCloner->isActive()); ASSERT_TRUE(collectionStats.initCalled); @@ -913,20 +993,8 @@ TEST_F(CollectionClonerTest, InsertDocumentsFailed) { const std::vector<BSONObj>::const_iterator end) { return Status(ErrorCodes::OperationFailed, ""); }; - - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, emptyArray)); - } - - collectionCloner->waitForDbWorker(); ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(BSON("_id" << 1)))); - } + pauser.resume(); collectionCloner->join(); ASSERT_FALSE(collectionCloner->isActive()); @@ -936,39 +1004,24 @@ TEST_F(CollectionClonerTest, InsertDocumentsFailed) { } TEST_F(CollectionClonerTest, InsertDocumentsSingleBatch) { + // Set up documents to be returned from upstream node. + _server->insert(nss.ns(), BSON("_id" << 1)); + _server->insert(nss.ns(), BSON("_id" << 2)); + ASSERT_OK(collectionCloner->startup()); ASSERT_TRUE(collectionCloner->isActive()); { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCountResponse(0)); + processNetworkResponse(createCountResponse(2)); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); } - ASSERT_TRUE(collectionCloner->isActive()); - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - ASSERT_TRUE(collectionStats.initCalled); - - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, emptyArray)); - } - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - - const BSONObj doc = BSON("_id" << 1); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc))); - } - collectionCloner->join(); // TODO: record the documents during insert and compare them // -- maybe better done using a real storage engine, like ephemeral for test. - ASSERT_EQUALS(1, collectionStats.insertCount); + ASSERT_EQUALS(2, collectionStats.insertCount); + auto stats = collectionCloner->getStats(); + ASSERT_EQUALS(1u, stats.receivedBatches); ASSERT_TRUE(collectionStats.commitCalled); ASSERT_OK(getStatus()); @@ -976,115 +1029,27 @@ TEST_F(CollectionClonerTest, InsertDocumentsSingleBatch) { } TEST_F(CollectionClonerTest, InsertDocumentsMultipleBatches) { + // Set up documents to be returned from upstream node. + _server->insert(nss.ns(), BSON("_id" << 1)); + _server->insert(nss.ns(), BSON("_id" << 2)); + _server->insert(nss.ns(), BSON("_id" << 3)); + + collectionCloner->setBatchSize_forTest(2); ASSERT_OK(collectionCloner->startup()); ASSERT_TRUE(collectionCloner->isActive()); { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCountResponse(0)); + processNetworkResponse(createCountResponse(3)); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); } - ASSERT_TRUE(collectionCloner->isActive()); - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - ASSERT_TRUE(collectionStats.initCalled); - - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, emptyArray)); - } - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - - const BSONObj doc = BSON("_id" << 1); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, BSON_ARRAY(doc))); - } - - collectionCloner->waitForDbWorker(); - // TODO: record the documents during insert and compare them - // -- maybe better done using a real storage engine, like ephemeral for test. - ASSERT_EQUALS(1, collectionStats.insertCount); - - ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); - ASSERT_TRUE(collectionCloner->isActive()); - - const BSONObj doc2 = BSON("_id" << 1); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc2))); - } - collectionCloner->join(); // TODO: record the documents during insert and compare them // -- maybe better done using a real storage engine, like ephemeral for test. - ASSERT_EQUALS(2, collectionStats.insertCount); - ASSERT_TRUE(collectionStats.commitCalled); - - ASSERT_OK(getStatus()); - ASSERT_FALSE(collectionCloner->isActive()); -} - -TEST_F(CollectionClonerTest, LastBatchContainsNoDocuments) { - ASSERT_OK(collectionCloner->startup()); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCountResponse(0)); - processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); - } - ASSERT_TRUE(collectionCloner->isActive()); - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - ASSERT_TRUE(collectionStats.initCalled); - - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, emptyArray)); - } - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - - const BSONObj doc = BSON("_id" << 1); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, BSON_ARRAY(doc))); - } - - collectionCloner->waitForDbWorker(); - ASSERT_EQUALS(1, collectionStats.insertCount); - - ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); - ASSERT_TRUE(collectionCloner->isActive()); - - const BSONObj doc2 = BSON("_id" << 2); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, BSON_ARRAY(doc2), "nextBatch")); - } - - collectionCloner->waitForDbWorker(); - ASSERT_EQUALS(2, collectionStats.insertCount); - - ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createFinalCursorResponse(emptyArray)); - } - - collectionCloner->join(); - ASSERT_EQUALS(2, collectionStats.insertCount); + ASSERT_EQUALS(3, collectionStats.insertCount); ASSERT_TRUE(collectionStats.commitCalled); + auto stats = collectionCloner->getStats(); + ASSERT_EQUALS(2u, stats.receivedBatches); ASSERT_OK(getStatus()); ASSERT_FALSE(collectionCloner->isActive()); @@ -1101,53 +1066,24 @@ TEST_F(CollectionClonerTest, CollectionClonerTransitionsToCompleteIfShutdownBefo * Restarting cloning should fail with ErrorCodes::ShutdownInProgress error. */ TEST_F(CollectionClonerTest, CollectionClonerCannotBeRestartedAfterPreviousFailure) { + // Set up document to return from upstream. + _server->insert(nss.ns(), BSON("_id" << 1)); + // First cloning attempt - fails while reading documents from source collection. unittest::log() << "Starting first collection cloning attempt"; + _client->setFailureForQuery( + {ErrorCodes::UnknownError, "failed to read remaining documents from source collection"}); ASSERT_OK(collectionCloner->startup()); ASSERT_TRUE(collectionCloner->isActive()); { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCountResponse(0)); + processNetworkResponse(createCountResponse(1)); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); } - ASSERT_TRUE(collectionCloner->isActive()); - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - ASSERT_TRUE(collectionStats.initCalled); - - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, emptyArray)); - } - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, BSON_ARRAY(BSON("_id" << 1)))); - } - - collectionCloner->waitForDbWorker(); - ASSERT_EQUALS(1, collectionStats.insertCount); - - // Check that the status hasn't changed from the initial value. - ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); - ASSERT_TRUE(collectionCloner->isActive()); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(ErrorCodes::OperationFailed, - "failed to read remaining documents from source collection"); - } - collectionCloner->join(); - ASSERT_EQUALS(1, collectionStats.insertCount); - ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus()); + ASSERT_EQUALS(ErrorCodes::UnknownError, getStatus()); ASSERT_FALSE(collectionCloner->isActive()); // Second cloning attempt - run to completion. @@ -1209,6 +1145,7 @@ TEST_F(CollectionClonerTest, CollectionClonerResetsOnCompletionCallbackFunctionA TEST_F(CollectionClonerTest, CollectionClonerWaitsForPendingTasksToCompleteBeforeInvokingOnCompletionCallback) { + MockClientPauser pauser(_client); ASSERT_OK(collectionCloner->startup()); ASSERT_TRUE(collectionCloner->isActive()); @@ -1227,20 +1164,11 @@ TEST_F(CollectionClonerTest, } ASSERT_TRUE(collectionCloner->isActive()); - collectionCloner->waitForDbWorker(); + _client->waitForPausedQuery(); ASSERT_TRUE(collectionCloner->isActive()); ASSERT_TRUE(collectionStats.initCalled); - BSONArray emptyArray; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, emptyArray)); - } - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - - // At this point, the CollectionCloner has sent the find request to establish the cursor. + // At this point, the CollectionCloner is waiting for the query to complete. // We want to return the first batch of documents for the collection from the network so that // the CollectionCloner schedules the first _insertDocuments DB task and the getMore request for // the next batch of documents. @@ -1257,26 +1185,20 @@ TEST_F(CollectionClonerTest, // Return first batch of collection documents from remote server for the getMore request. const BSONObj doc = BSON("_id" << 1); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - - assertRemoteCommandNameEquals( - "getMore", net->scheduleSuccessfulResponse(createCursorResponse(1, BSON_ARRAY(doc)))); - net->runReadyNetworkOperations(); - } - - // Confirm that CollectionCloner attempted to schedule _insertDocuments task. - ASSERT_TRUE(insertDocumentsFn); - - // Return an error for the getMore request for the next batch of collection documents. - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - - assertRemoteCommandNameEquals( - "getMore", - net->scheduleErrorResponse(Status(ErrorCodes::OperationFailed, "getMore failed"))); - net->runReadyNetworkOperations(); - } + _server->insert(nss.ns(), doc); + _client->setFailureForQuery({ErrorCodes::UnknownError, "getMore failed"}); + // Wait for the _runQuery method to exit. We can't get at it directly but we can wait + // for a task scheduled after it to complete. + auto& executor = getExecutor(); + auto& event = executor.makeEvent().getValue(); + auto nextTask = + executor + .scheduleWork([&executor, event](const executor::TaskExecutor::CallbackArgs&) { + executor.signalEvent(event); + }) + .getValue(); + pauser.resume(); + executor.waitForEvent(event); // CollectionCloner should still be active because we have not finished processing the // insertDocuments task. @@ -1287,90 +1209,39 @@ TEST_F(CollectionClonerTest, // error passed to the completion guard (ie. from the failed getMore request). executor::TaskExecutor::CallbackArgs callbackArgs( &getExecutor(), {}, Status(ErrorCodes::CallbackCanceled, "")); + ASSERT_TRUE(insertDocumentsFn); insertDocumentsFn(callbackArgs); // Reset 'insertDocumentsFn' to release last reference count on completion guard. insertDocumentsFn = {}; - // No need to call CollectionCloner::join() because we invoked the _insertDocuments callback - // synchronously. + collectionCloner->join(); ASSERT_FALSE(collectionCloner->isActive()); - ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus()); + ASSERT_EQUALS(ErrorCodes::UnknownError, getStatus()); } -class CollectionClonerUUIDTest : public CollectionClonerTest { +class CollectionClonerRenamedBeforeStartTest : public CollectionClonerTest { protected: - // The UUID tests should deal gracefully with renamed collections, so start the cloner with - // an alternate name. + // The CollectionCloner should deal gracefully with collections renamed before the cloner + // was started, so start it with an alternate name. const NamespaceString alternateNss{"db", "alternateCollName"}; - void startupWithUUID(int maxNumCloningCursors = 1) { - collectionCloner.reset(); - options.uuid = UUID::gen(); - collectionCloner = stdx::make_unique<CollectionCloner>(&getExecutor(), - dbWorkThreadPool.get(), - target, - alternateNss, - options, - setStatusCallback(), - storageInterface.get(), - defaultBatchSize); - - ASSERT_OK(collectionCloner->startup()); - } - - void testWithMaxNumCloningCursors(int maxNumCloningCursors, StringData cmdName) { - startupWithUUID(maxNumCloningCursors); - - CollectionOptions actualOptions; - CollectionMockStats stats; - CollectionBulkLoaderMock* loader = new CollectionBulkLoaderMock(&stats); - bool collectionCreated = false; - storageInterface->createCollectionForBulkFn = [&](const NamespaceString& theNss, - const CollectionOptions& theOptions, - const BSONObj idIndexSpec, - const std::vector<BSONObj>& theIndexSpecs) - -> StatusWith<std::unique_ptr<CollectionBulkLoader>> { - collectionCreated = true; - actualOptions = theOptions; - return std::unique_ptr<CollectionBulkLoader>(loader); - }; - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCountResponse(0)); - processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); - } - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCreated); - - // Fetcher should be scheduled after cloner creates collection. - auto net = getNet(); - executor::NetworkInterfaceMock::InNetworkGuard guard(net); - ASSERT_TRUE(net->hasReadyRequests()); - NetworkOperationIterator noi = net->getNextReadyRequest(); - ASSERT_FALSE(net->hasReadyRequests()); - auto&& noiRequest = noi->getRequest(); - ASSERT_EQUALS(nss.db().toString(), noiRequest.dbname); - ASSERT_BSONOBJ_EQ(actualOptions.toBSON(), options.toBSON()); - - ASSERT_EQUALS(cmdName, std::string(noiRequest.cmdObj.firstElementFieldName())); - ASSERT_EQUALS(cmdName == "find", noiRequest.cmdObj.getField("noCursorTimeout").trueValue()); - auto requestUUID = uassertStatusOK(UUID::parse(noiRequest.cmdObj.firstElement())); - ASSERT_EQUALS(options.uuid.get(), requestUUID); - } + const NamespaceString& getStartupNss() const override { + return alternateNss; + }; /** * Sets up a test for the CollectionCloner that simulates the collection being dropped while * copying the documents. - * The ARM returns CursorNotFound error to indicate a collection drop. Subsequently, the - * CollectionCloner should run a find command on the collection by UUID. This should be the next - * ready request on in the network interface. + * The DBClientConnection returns a CursorNotFound error to indicate a collection drop. */ void setUpVerifyCollectionWasDroppedTest() { - startupWithUUID(); - + // Pause the query so we can reliably wait for it to complete. + MockClientPauser pauser(_client); + // Return error response from the query. + _client->setFailureForQuery( + {ErrorCodes::CursorNotFound, "collection dropped while copying documents"}); + ASSERT_OK(collectionCloner->startup()); { executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); processNetworkResponse(createCountResponse(0)); @@ -1378,24 +1249,10 @@ protected: } ASSERT_TRUE(collectionCloner->isActive()); - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); + _client->waitForPausedQuery(); ASSERT_TRUE(collectionStats.initCalled); - - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(createCursorResponse(1, BSONArray())); - } - - collectionCloner->waitForDbWorker(); - ASSERT_TRUE(collectionCloner->isActive()); - - // Return error response to getMore command. - { - executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); - processNetworkResponse(ErrorCodes::CursorNotFound, - "collection dropped while copying documents"); - } + pauser.resume(); + _client->waitForResumedQuery(); } /** @@ -1416,8 +1273,8 @@ protected: } }; -TEST_F(CollectionClonerUUIDTest, FirstRemoteCommandWithUUID) { - startupWithUUID(); +TEST_F(CollectionClonerRenamedBeforeStartTest, FirstRemoteCommandWithRenamedCollection) { + ASSERT_OK(collectionCloner->startup()); auto net = getNet(); executor::NetworkInterfaceMock::InNetworkGuard guard(getNet()); @@ -1433,9 +1290,7 @@ TEST_F(CollectionClonerUUIDTest, FirstRemoteCommandWithUUID) { ASSERT_TRUE(collectionCloner->isActive()); } -TEST_F(CollectionClonerUUIDTest, BeginCollectionWithUUID) { - startupWithUUID(); - +TEST_F(CollectionClonerRenamedBeforeStartTest, BeginCollectionWithUUID) { CollectionMockStats stats; CollectionBulkLoaderMock* loader = new CollectionBulkLoaderMock(&stats); NamespaceString collNss; @@ -1454,6 +1309,11 @@ TEST_F(CollectionClonerUUIDTest, BeginCollectionWithUUID) { return std::unique_ptr<CollectionBulkLoader>(loader); }; + // Pause the client so the cloner stops in the fetcher. + MockClientPauser pauser(_client); + + ASSERT_OK(collectionCloner->startup()); + // Split listIndexes response into 2 batches: first batch contains idIndexSpec and // second batch contains specs. We expect the collection cloner to fix up the collection names // (here from 'nss' to 'alternateNss') in the index specs, as the collection with the given UUID @@ -1506,20 +1366,16 @@ TEST_F(CollectionClonerUUIDTest, BeginCollectionWithUUID) { ASSERT_TRUE(collectionCloner->isActive()); } -TEST_F(CollectionClonerUUIDTest, SingleCloningCursorWithUUIDUsesFindCommand) { - // With a single cloning cursor, expect a find command. - testWithMaxNumCloningCursors(1, "find"); -} - /** * Start cloning. - * While copying collection, simulate a collection drop by having the ARM return a CursorNotFound - * error. + * While copying collection, simulate a collection drop by having the DBClientConnection return a + * CursorNotFound error. * The CollectionCloner should run a find command on the collection by UUID. * Simulate successful find command with a drop-pending namespace in the response. * The CollectionCloner should complete with a successful final status. */ -TEST_F(CollectionClonerUUIDTest, CloningIsSuccessfulIfCollectionWasDroppedWhileCopyingDocuments) { +TEST_F(CollectionClonerRenamedBeforeStartTest, + CloningIsSuccessfulIfCollectionWasDroppedWhileCopyingDocuments) { setUpVerifyCollectionWasDroppedTest(); // CollectionCloner should send a find command with the collection's UUID. @@ -1544,14 +1400,13 @@ TEST_F(CollectionClonerUUIDTest, CloningIsSuccessfulIfCollectionWasDroppedWhileC /** * Start cloning. - * While copying collection, simulate a collection drop by having the ARM return a CursorNotFound - * error. + * While copying collection, simulate a collection drop by having the DBClientConnection return a + * CursorNotFound error. * The CollectionCloner should run a find command on the collection by UUID. * Shut the CollectionCloner down. - * The CollectionCloner should return a CursorNotFound final status which is the last error from the - * ARM. + * The CollectionCloner should return a CursorNotFound final status. */ -TEST_F(CollectionClonerUUIDTest, +TEST_F(CollectionClonerRenamedBeforeStartTest, ShuttingDownCollectionClonerDuringCollectionDropVerificationReturnsCallbackCanceled) { setUpVerifyCollectionWasDroppedTest(); |