summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/collection_cloner_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/collection_cloner_test.cpp')
-rw-r--r--src/mongo/db/repl/collection_cloner_test.cpp779
1 files changed, 317 insertions, 462 deletions
diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp
index 726a2b6a2cf..d302ed0492d 100644
--- a/src/mongo/db/repl/collection_cloner_test.cpp
+++ b/src/mongo/db/repl/collection_cloner_test.cpp
@@ -37,6 +37,7 @@
#include "mongo/db/repl/collection_cloner.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/repl/storage_interface_mock.h"
+#include "mongo/dbtests/mock/mock_dbclient_connection.h"
#include "mongo/stdx/memory.h"
#include "mongo/unittest/task_executor_proxy.h"
#include "mongo/unittest/unittest.h"
@@ -58,6 +59,134 @@ public:
}
};
+class FailableMockDBClientConnection : public MockDBClientConnection {
+public:
+ FailableMockDBClientConnection(MockRemoteDBServer* remote, executor::NetworkInterfaceMock* net)
+ : MockDBClientConnection(remote), _net(net) {}
+
+ virtual ~FailableMockDBClientConnection() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _paused = false;
+ _cond.notify_all();
+ _cond.wait(lk, [this] { return !_resuming; });
+ }
+
+ Status connect(const HostAndPort& host, StringData applicationName) override {
+ if (!_failureForConnect.isOK())
+ return _failureForConnect;
+ return MockDBClientConnection::connect(host, applicationName);
+ }
+
+ using MockDBClientConnection::query; // This avoids warnings from -Woverloaded-virtual
+ unsigned long long query(stdx::function<void(mongo::DBClientCursorBatchIterator&)> f,
+ const NamespaceStringOrUUID& nsOrUuid,
+ mongo::Query query,
+ const mongo::BSONObj* fieldsToReturn,
+ int queryOptions,
+ int batchSize) override {
+ ON_BLOCK_EXIT([this]() {
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _queryCount++;
+ }
+ _cond.notify_all();
+ });
+ {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _waiting = _paused;
+ _cond.notify_all();
+ while (_paused) {
+ lk.unlock();
+ _net->waitForWork();
+ lk.lock();
+ }
+ _waiting = false;
+ }
+ auto result = MockDBClientConnection::query(
+ f, nsOrUuid, query, fieldsToReturn, queryOptions, batchSize);
+ uassertStatusOK(_failureForQuery);
+ return result;
+ }
+
+ void setFailureForConnect(Status failure) {
+ _failureForConnect = failure;
+ }
+
+ void setFailureForQuery(Status failure) {
+ _failureForQuery = failure;
+ }
+
+ void pause() {
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _paused = true;
+ }
+ _cond.notify_all();
+ }
+ void resume() {
+ {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _resuming = true;
+ _paused = false;
+ _resumedQueryCount = _queryCount;
+ while (_waiting) {
+ lk.unlock();
+ _net->signalWorkAvailable();
+ mongo::sleepmillis(10);
+ lk.lock();
+ }
+ _resuming = false;
+ _cond.notify_all();
+ }
+ }
+
+ // Waits for the next query after pause() is called to start.
+ void waitForPausedQuery() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _cond.wait(lk, [this] { return _waiting; });
+ }
+
+ // Waits for the next query to run after resume() is called to complete.
+ void waitForResumedQuery() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _cond.wait(lk, [this] { return _resumedQueryCount != _queryCount; });
+ }
+
+private:
+ executor::NetworkInterfaceMock* _net;
+ stdx::mutex _mutex;
+ stdx::condition_variable _cond;
+ bool _paused = false;
+ bool _waiting = false;
+ bool _resuming = false;
+ int _queryCount = 0;
+ int _resumedQueryCount = 0;
+ Status _failureForConnect = Status::OK();
+ Status _failureForQuery = Status::OK();
+};
+
+// RAII class to pause the client; since tests are very exception-heavy this prevents them
+// from hanging on failure.
+class MockClientPauser {
+ MONGO_DISALLOW_COPYING(MockClientPauser);
+
+public:
+ MockClientPauser(FailableMockDBClientConnection* client) : _client(client) {
+ _client->pause();
+ };
+ ~MockClientPauser() {
+ resume();
+ }
+ void resume() {
+ if (_client)
+ _client->resume();
+ _client = nullptr;
+ }
+
+private:
+ FailableMockDBClientConnection* _client;
+};
+
class CollectionClonerTest : public BaseClonerTest {
public:
BaseCloner* getCloner() const override;
@@ -70,6 +199,17 @@ protected:
void setUp() override;
void tearDown() override;
+ virtual CollectionOptions getCollectionOptions() const {
+ CollectionOptions options;
+ options.uuid = UUID::gen();
+ return options;
+ }
+
+ virtual const NamespaceString& getStartupNss() const {
+ return nss;
+ };
+
+
std::vector<BSONObj> makeSecondaryIndexSpecs(const NamespaceString& nss);
// A simple arbitrary value to use as the default batch size.
@@ -79,16 +219,19 @@ protected:
std::unique_ptr<CollectionCloner> collectionCloner;
CollectionMockStats collectionStats; // Used by the _loader.
CollectionBulkLoaderMock* _loader; // Owned by CollectionCloner.
+ bool _clientCreated = false;
+ FailableMockDBClientConnection* _client; // owned by the CollectionCloner once created.
+ std::unique_ptr<MockRemoteDBServer> _server;
};
void CollectionClonerTest::setUp() {
BaseClonerTest::setUp();
- options = {};
+ options = getCollectionOptions();
collectionCloner.reset(nullptr);
collectionCloner = stdx::make_unique<CollectionCloner>(&getExecutor(),
dbWorkThreadPool.get(),
target,
- nss,
+ getStartupNss(),
options,
setStatusCallback(),
storageInterface.get(),
@@ -106,6 +249,13 @@ void CollectionClonerTest::setUp() {
return StatusWith<std::unique_ptr<CollectionBulkLoader>>(
std::unique_ptr<CollectionBulkLoader>(_loader));
};
+ _server = stdx::make_unique<MockRemoteDBServer>(target.toString());
+ _server->assignCollectionUuid(nss.ns(), *options.uuid);
+ _client = new FailableMockDBClientConnection(_server.get(), getNet());
+ collectionCloner->setCreateClientFn_forTest([this]() {
+ _clientCreated = true;
+ return std::unique_ptr<DBClientConnection>(_client);
+ });
}
// Return index specs to use for secondary indexes.
@@ -123,7 +273,11 @@ std::vector<BSONObj> CollectionClonerTest::makeSecondaryIndexSpecs(const Namespa
void CollectionClonerTest::tearDown() {
BaseClonerTest::tearDown();
// Executor may still invoke collection cloner's callback before shutting down.
- collectionCloner.reset(nullptr);
+ collectionCloner.reset();
+ if (!_clientCreated)
+ delete _client;
+ _clientCreated = false;
+ _server.reset();
options = {};
}
@@ -180,6 +334,19 @@ TEST_F(CollectionClonerTest, InvalidConstruction) {
"'storageEngine.storageEngine1' has to be an embedded document.");
}
+ // UUID must be present.
+ {
+ CollectionOptions invalidOptions = options;
+ invalidOptions.uuid = boost::none;
+ StorageInterface* si = storageInterface.get();
+ ASSERT_THROWS_CODE_AND_WHAT(
+ CollectionCloner(
+ &executor, pool, target, nss, invalidOptions, cb, si, defaultBatchSize),
+ AssertionException,
+ 50953,
+ "Missing collection UUID in CollectionCloner, collection name: db.coll");
+ }
+
// Callback function cannot be null.
{
CollectionCloner::CallbackFn nullCb;
@@ -190,6 +357,17 @@ TEST_F(CollectionClonerTest, InvalidConstruction) {
ErrorCodes::BadValue,
"callback function cannot be null");
}
+
+ // Batch size must be non-negative.
+ {
+ StorageInterface* si = storageInterface.get();
+ constexpr int kInvalidBatchSize = -1;
+ ASSERT_THROWS_CODE_AND_WHAT(
+ CollectionCloner(&executor, pool, target, nss, options, cb, si, kInvalidBatchSize),
+ AssertionException,
+ 50954,
+ "collectionClonerBatchSize must be non-negative.");
+ }
}
TEST_F(CollectionClonerTest, ClonerLifeCycle) {
@@ -206,7 +384,9 @@ TEST_F(CollectionClonerTest, FirstRemoteCommand) {
auto&& noiRequest = noi->getRequest();
ASSERT_EQUALS(nss.db().toString(), noiRequest.dbname);
ASSERT_EQUALS("count", std::string(noiRequest.cmdObj.firstElementFieldName()));
- ASSERT_EQUALS(nss.coll().toString(), noiRequest.cmdObj.firstElement().valuestrsafe());
+ auto requestUUID = uassertStatusOK(UUID::parse(noiRequest.cmdObj.firstElement()));
+ ASSERT_EQUALS(options.uuid.get(), requestUUID);
+
ASSERT_FALSE(net->hasReadyRequests());
ASSERT_TRUE(collectionCloner->isActive());
}
@@ -347,18 +527,16 @@ TEST_F(CollectionClonerTest,
ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus());
}
-TEST_F(CollectionClonerTest, DoNotCreateIDIndexIfAutoIndexIdUsed) {
- options = {};
- options.autoIndexId = CollectionOptions::NO;
- collectionCloner.reset(new CollectionCloner(&getExecutor(),
- dbWorkThreadPool.get(),
- target,
- nss,
- options,
- setStatusCallback(),
- storageInterface.get(),
- defaultBatchSize));
+class CollectionClonerNoAutoIndexTest : public CollectionClonerTest {
+protected:
+ CollectionOptions getCollectionOptions() const override {
+ CollectionOptions options = CollectionClonerTest::getCollectionOptions();
+ options.autoIndexId = CollectionOptions::NO;
+ return options;
+ }
+};
+TEST_F(CollectionClonerNoAutoIndexTest, DoNotCreateIDIndexIfAutoIndexIdUsed) {
NamespaceString collNss;
CollectionOptions collOptions;
std::vector<BSONObj> collIndexSpecs{BSON("fakeindexkeys" << 1)}; // init with one doc.
@@ -376,32 +554,24 @@ TEST_F(CollectionClonerTest, DoNotCreateIDIndexIfAutoIndexIdUsed) {
return std::unique_ptr<CollectionBulkLoader>(loader);
};
+ const BSONObj doc = BSON("_id" << 1);
+ _server->insert(nss.ns(), doc);
+ // Pause the CollectionCloner before executing the query so we can verify events which are
+ // supposed to happen before the query.
+ MockClientPauser pauser(_client);
ASSERT_OK(collectionCloner->startup());
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
+ processNetworkResponse(createCountResponse(1));
processNetworkResponse(createListIndexesResponse(0, BSONArray()));
}
ASSERT_TRUE(collectionCloner->isActive());
- collectionCloner->waitForDbWorker();
+ _client->waitForPausedQuery();
ASSERT_TRUE(collectionCloner->isActive());
ASSERT_TRUE(collectionStats.initCalled);
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, emptyArray));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
-
- const BSONObj doc = BSON("_id" << 1);
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc)));
- }
+ pauser.resume();
collectionCloner->join();
ASSERT_EQUALS(1, collectionStats.insertCount);
ASSERT_TRUE(collectionStats.commitCalled);
@@ -595,6 +765,9 @@ TEST_F(CollectionClonerTest, BeginCollectionFailed) {
}
TEST_F(CollectionClonerTest, BeginCollection) {
+ // Pause the CollectionCloner before executing the query so we can verify state after
+ // the listIndexes call.
+ MockClientPauser pauser(_client);
ASSERT_OK(collectionCloner->startup());
CollectionMockStats stats;
@@ -682,7 +855,10 @@ TEST_F(CollectionClonerTest, FindFetcherScheduleFailed) {
ASSERT_FALSE(collectionCloner->isActive());
}
-TEST_F(CollectionClonerTest, FindCommandAfterBeginCollection) {
+TEST_F(CollectionClonerTest, QueryAfterCreateCollection) {
+ // Pause the CollectionCloner before executing the query so we can verify the collection is
+ // created before the query.
+ MockClientPauser pauser(_client);
ASSERT_OK(collectionCloner->startup());
CollectionMockStats stats;
@@ -705,21 +881,15 @@ TEST_F(CollectionClonerTest, FindCommandAfterBeginCollection) {
collectionCloner->waitForDbWorker();
ASSERT_TRUE(collectionCreated);
-
- // Fetcher should be scheduled after cloner creates collection.
- auto net = getNet();
- executor::NetworkInterfaceMock::InNetworkGuard guard(net);
- ASSERT_TRUE(net->hasReadyRequests());
- NetworkOperationIterator noi = net->getNextReadyRequest();
- auto&& noiRequest = noi->getRequest();
- ASSERT_EQUALS(nss.db().toString(), noiRequest.dbname);
- ASSERT_EQUALS("find", std::string(noiRequest.cmdObj.firstElementFieldName()));
- ASSERT_EQUALS(nss.coll().toString(), noiRequest.cmdObj.firstElement().valuestrsafe());
- ASSERT_TRUE(noiRequest.cmdObj.getField("noCursorTimeout").trueValue());
- ASSERT_FALSE(net->hasReadyRequests());
+ // Make sure the query starts.
+ _client->waitForPausedQuery();
}
-TEST_F(CollectionClonerTest, EstablishCursorCommandFailed) {
+TEST_F(CollectionClonerTest, QueryFailed) {
+ // For this test to work properly, the error cannot be one of the special codes
+ // (OperationFailed or CursorNotFound) which trigger an attempt to see if the collection
+ // was deleted.
+ _client->setFailureForQuery({ErrorCodes::UnknownError, "QueryFailedTest UnknownError"});
ASSERT_OK(collectionCloner->startup());
{
@@ -727,98 +897,22 @@ TEST_F(CollectionClonerTest, EstablishCursorCommandFailed) {
processNetworkResponse(createCountResponse(0));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
- ASSERT_TRUE(collectionCloner->isActive());
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(BSON("ok" << 0 << "errmsg"
- << ""
- << "code"
- << ErrorCodes::CursorNotFound));
- }
-
- ASSERT_EQUALS(ErrorCodes::CursorNotFound, getStatus().code());
- ASSERT_FALSE(collectionCloner->isActive());
-}
-
-TEST_F(CollectionClonerTest, CollectionClonerResendsFindCommandOnRetriableError) {
- ASSERT_OK(collectionCloner->startup());
-
- auto net = getNet();
- executor::NetworkInterfaceMock::InNetworkGuard guard(net);
-
- // CollectionCollection sends listIndexes request irrespective of collection size in a
- // successful count response.
- assertRemoteCommandNameEquals("count", net->scheduleSuccessfulResponse(createCountResponse(0)));
- net->runReadyNetworkOperations();
-
- // CollectionCloner requires a successful listIndexes response in order to send the find request
- // for the documents in the collection.
- assertRemoteCommandNameEquals(
- "listIndexes",
- net->scheduleSuccessfulResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))));
- net->runReadyNetworkOperations();
-
- // Respond to the find request with a retriable error.
- assertRemoteCommandNameEquals("find",
- net->scheduleErrorResponse(Status(ErrorCodes::HostNotFound, "")));
- net->runReadyNetworkOperations();
- ASSERT_TRUE(collectionCloner->isActive());
-
- // This check exists to ensure that the command used to establish the cursors is retried,
- // regardless of the command format.
- auto noi = net->getNextReadyRequest();
- assertRemoteCommandNameEquals("find", noi->getRequest());
- net->blackHole(noi);
-}
-
-TEST_F(CollectionClonerTest, EstablishCursorCommandCanceled) {
- ASSERT_OK(collectionCloner->startup());
-
- ASSERT_TRUE(collectionCloner->isActive());
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
- scheduleNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
- }
- ASSERT_TRUE(collectionCloner->isActive());
-
- auto net = getNet();
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
-
- net->runReadyNetworkOperations();
- }
-
- collectionCloner->waitForDbWorker();
-
- ASSERT_TRUE(collectionCloner->isActive());
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- scheduleNetworkResponse(BSON("ok" << 1));
- }
- ASSERT_TRUE(collectionCloner->isActive());
-
- collectionCloner->shutdown();
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- getNet()->logQueues();
- net->runReadyNetworkOperations();
- }
-
- ASSERT_EQUALS(ErrorCodes::CallbackCanceled, getStatus().code());
+ collectionCloner->join();
+ ASSERT_EQUALS(ErrorCodes::UnknownError, getStatus().code());
ASSERT_FALSE(collectionCloner->isActive());
}
TEST_F(CollectionClonerTest, InsertDocumentsScheduleDbWorkFailed) {
+ // Set up documents to be returned from upstream node.
+ _server->insert(nss.ns(), BSON("_id" << 1));
+
+ // Pause the client so we can set up the failure.
+ MockClientPauser pauser(_client);
ASSERT_OK(collectionCloner->startup());
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
+ processNetworkResponse(createCountResponse(1));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
@@ -831,31 +925,24 @@ TEST_F(CollectionClonerTest, InsertDocumentsScheduleDbWorkFailed) {
return StatusWith<executor::TaskExecutor::CallbackHandle>(ErrorCodes::UnknownError, "");
});
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, emptyArray));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
-
- const BSONObj doc = BSON("_id" << 1);
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc)));
- }
+ pauser.resume();
+ collectionCloner->join();
ASSERT_EQUALS(ErrorCodes::UnknownError, getStatus().code());
ASSERT_FALSE(collectionCloner->isActive());
}
TEST_F(CollectionClonerTest, InsertDocumentsCallbackCanceled) {
+ // Set up documents to be returned from upstream node.
+ _server->insert(nss.ns(), BSON("_id" << 1));
+
+ // Pause the client so we can set up the failure.
+ MockClientPauser pauser(_client);
ASSERT_OK(collectionCloner->startup());
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
+ processNetworkResponse(createCountResponse(1));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
@@ -874,37 +961,30 @@ TEST_F(CollectionClonerTest, InsertDocumentsCallbackCanceled) {
return StatusWith<executor::TaskExecutor::CallbackHandle>(handle);
});
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, emptyArray));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(BSON("_id" << 1))));
- }
+ pauser.resume();
collectionCloner->join();
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, getStatus().code());
ASSERT_FALSE(collectionCloner->isActive());
}
TEST_F(CollectionClonerTest, InsertDocumentsFailed) {
+ // Set up documents to be returned from upstream node.
+ _server->insert(nss.ns(), BSON("_id" << 1));
+
+ // Pause the client so we can set up the failure.
+ MockClientPauser pauser(_client);
ASSERT_OK(collectionCloner->startup());
ASSERT_TRUE(collectionCloner->isActive());
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
+ processNetworkResponse(createCountResponse(1));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
ASSERT_TRUE(collectionCloner->isActive());
getNet()->logQueues();
- collectionCloner->waitForDbWorker();
+ _client->waitForPausedQuery();
ASSERT_TRUE(collectionCloner->isActive());
ASSERT_TRUE(collectionStats.initCalled);
@@ -913,20 +993,8 @@ TEST_F(CollectionClonerTest, InsertDocumentsFailed) {
const std::vector<BSONObj>::const_iterator end) {
return Status(ErrorCodes::OperationFailed, "");
};
-
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, emptyArray));
- }
-
- collectionCloner->waitForDbWorker();
ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(BSON("_id" << 1))));
- }
+ pauser.resume();
collectionCloner->join();
ASSERT_FALSE(collectionCloner->isActive());
@@ -936,39 +1004,24 @@ TEST_F(CollectionClonerTest, InsertDocumentsFailed) {
}
TEST_F(CollectionClonerTest, InsertDocumentsSingleBatch) {
+ // Set up documents to be returned from upstream node.
+ _server->insert(nss.ns(), BSON("_id" << 1));
+ _server->insert(nss.ns(), BSON("_id" << 2));
+
ASSERT_OK(collectionCloner->startup());
ASSERT_TRUE(collectionCloner->isActive());
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
+ processNetworkResponse(createCountResponse(2));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
- ASSERT_TRUE(collectionCloner->isActive());
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
- ASSERT_TRUE(collectionStats.initCalled);
-
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, emptyArray));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
-
- const BSONObj doc = BSON("_id" << 1);
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc)));
- }
-
collectionCloner->join();
// TODO: record the documents during insert and compare them
// -- maybe better done using a real storage engine, like ephemeral for test.
- ASSERT_EQUALS(1, collectionStats.insertCount);
+ ASSERT_EQUALS(2, collectionStats.insertCount);
+ auto stats = collectionCloner->getStats();
+ ASSERT_EQUALS(1u, stats.receivedBatches);
ASSERT_TRUE(collectionStats.commitCalled);
ASSERT_OK(getStatus());
@@ -976,115 +1029,27 @@ TEST_F(CollectionClonerTest, InsertDocumentsSingleBatch) {
}
TEST_F(CollectionClonerTest, InsertDocumentsMultipleBatches) {
+ // Set up documents to be returned from upstream node.
+ _server->insert(nss.ns(), BSON("_id" << 1));
+ _server->insert(nss.ns(), BSON("_id" << 2));
+ _server->insert(nss.ns(), BSON("_id" << 3));
+
+ collectionCloner->setBatchSize_forTest(2);
ASSERT_OK(collectionCloner->startup());
ASSERT_TRUE(collectionCloner->isActive());
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
+ processNetworkResponse(createCountResponse(3));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
- ASSERT_TRUE(collectionCloner->isActive());
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
- ASSERT_TRUE(collectionStats.initCalled);
-
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, emptyArray));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
-
- const BSONObj doc = BSON("_id" << 1);
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, BSON_ARRAY(doc)));
- }
-
- collectionCloner->waitForDbWorker();
- // TODO: record the documents during insert and compare them
- // -- maybe better done using a real storage engine, like ephemeral for test.
- ASSERT_EQUALS(1, collectionStats.insertCount);
-
- ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
- ASSERT_TRUE(collectionCloner->isActive());
-
- const BSONObj doc2 = BSON("_id" << 1);
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createFinalCursorResponse(BSON_ARRAY(doc2)));
- }
-
collectionCloner->join();
// TODO: record the documents during insert and compare them
// -- maybe better done using a real storage engine, like ephemeral for test.
- ASSERT_EQUALS(2, collectionStats.insertCount);
- ASSERT_TRUE(collectionStats.commitCalled);
-
- ASSERT_OK(getStatus());
- ASSERT_FALSE(collectionCloner->isActive());
-}
-
-TEST_F(CollectionClonerTest, LastBatchContainsNoDocuments) {
- ASSERT_OK(collectionCloner->startup());
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
- processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
- }
- ASSERT_TRUE(collectionCloner->isActive());
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
- ASSERT_TRUE(collectionStats.initCalled);
-
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, emptyArray));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
-
- const BSONObj doc = BSON("_id" << 1);
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, BSON_ARRAY(doc)));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_EQUALS(1, collectionStats.insertCount);
-
- ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
- ASSERT_TRUE(collectionCloner->isActive());
-
- const BSONObj doc2 = BSON("_id" << 2);
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, BSON_ARRAY(doc2), "nextBatch"));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_EQUALS(2, collectionStats.insertCount);
-
- ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createFinalCursorResponse(emptyArray));
- }
-
- collectionCloner->join();
- ASSERT_EQUALS(2, collectionStats.insertCount);
+ ASSERT_EQUALS(3, collectionStats.insertCount);
ASSERT_TRUE(collectionStats.commitCalled);
+ auto stats = collectionCloner->getStats();
+ ASSERT_EQUALS(2u, stats.receivedBatches);
ASSERT_OK(getStatus());
ASSERT_FALSE(collectionCloner->isActive());
@@ -1101,53 +1066,24 @@ TEST_F(CollectionClonerTest, CollectionClonerTransitionsToCompleteIfShutdownBefo
* Restarting cloning should fail with ErrorCodes::ShutdownInProgress error.
*/
TEST_F(CollectionClonerTest, CollectionClonerCannotBeRestartedAfterPreviousFailure) {
+ // Set up document to return from upstream.
+ _server->insert(nss.ns(), BSON("_id" << 1));
+
// First cloning attempt - fails while reading documents from source collection.
unittest::log() << "Starting first collection cloning attempt";
+ _client->setFailureForQuery(
+ {ErrorCodes::UnknownError, "failed to read remaining documents from source collection"});
ASSERT_OK(collectionCloner->startup());
ASSERT_TRUE(collectionCloner->isActive());
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
+ processNetworkResponse(createCountResponse(1));
processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
}
- ASSERT_TRUE(collectionCloner->isActive());
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
- ASSERT_TRUE(collectionStats.initCalled);
-
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, emptyArray));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, BSON_ARRAY(BSON("_id" << 1))));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_EQUALS(1, collectionStats.insertCount);
-
- // Check that the status hasn't changed from the initial value.
- ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
- ASSERT_TRUE(collectionCloner->isActive());
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(ErrorCodes::OperationFailed,
- "failed to read remaining documents from source collection");
- }
-
collectionCloner->join();
- ASSERT_EQUALS(1, collectionStats.insertCount);
- ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus());
+ ASSERT_EQUALS(ErrorCodes::UnknownError, getStatus());
ASSERT_FALSE(collectionCloner->isActive());
// Second cloning attempt - run to completion.
@@ -1209,6 +1145,7 @@ TEST_F(CollectionClonerTest, CollectionClonerResetsOnCompletionCallbackFunctionA
TEST_F(CollectionClonerTest,
CollectionClonerWaitsForPendingTasksToCompleteBeforeInvokingOnCompletionCallback) {
+ MockClientPauser pauser(_client);
ASSERT_OK(collectionCloner->startup());
ASSERT_TRUE(collectionCloner->isActive());
@@ -1227,20 +1164,11 @@ TEST_F(CollectionClonerTest,
}
ASSERT_TRUE(collectionCloner->isActive());
- collectionCloner->waitForDbWorker();
+ _client->waitForPausedQuery();
ASSERT_TRUE(collectionCloner->isActive());
ASSERT_TRUE(collectionStats.initCalled);
- BSONArray emptyArray;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, emptyArray));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
-
- // At this point, the CollectionCloner has sent the find request to establish the cursor.
+ // At this point, the CollectionCloner is waiting for the query to complete.
// We want to return the first batch of documents for the collection from the network so that
// the CollectionCloner schedules the first _insertDocuments DB task and the getMore request for
// the next batch of documents.
@@ -1257,26 +1185,20 @@ TEST_F(CollectionClonerTest,
// Return first batch of collection documents from remote server for the getMore request.
const BSONObj doc = BSON("_id" << 1);
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
-
- assertRemoteCommandNameEquals(
- "getMore", net->scheduleSuccessfulResponse(createCursorResponse(1, BSON_ARRAY(doc))));
- net->runReadyNetworkOperations();
- }
-
- // Confirm that CollectionCloner attempted to schedule _insertDocuments task.
- ASSERT_TRUE(insertDocumentsFn);
-
- // Return an error for the getMore request for the next batch of collection documents.
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
-
- assertRemoteCommandNameEquals(
- "getMore",
- net->scheduleErrorResponse(Status(ErrorCodes::OperationFailed, "getMore failed")));
- net->runReadyNetworkOperations();
- }
+ _server->insert(nss.ns(), doc);
+ _client->setFailureForQuery({ErrorCodes::UnknownError, "getMore failed"});
+ // Wait for the _runQuery method to exit. We can't get at it directly but we can wait
+ // for a task scheduled after it to complete.
+ auto& executor = getExecutor();
+ auto& event = executor.makeEvent().getValue();
+ auto nextTask =
+ executor
+ .scheduleWork([&executor, event](const executor::TaskExecutor::CallbackArgs&) {
+ executor.signalEvent(event);
+ })
+ .getValue();
+ pauser.resume();
+ executor.waitForEvent(event);
// CollectionCloner should still be active because we have not finished processing the
// insertDocuments task.
@@ -1287,90 +1209,39 @@ TEST_F(CollectionClonerTest,
// error passed to the completion guard (ie. from the failed getMore request).
executor::TaskExecutor::CallbackArgs callbackArgs(
&getExecutor(), {}, Status(ErrorCodes::CallbackCanceled, ""));
+ ASSERT_TRUE(insertDocumentsFn);
insertDocumentsFn(callbackArgs);
// Reset 'insertDocumentsFn' to release last reference count on completion guard.
insertDocumentsFn = {};
- // No need to call CollectionCloner::join() because we invoked the _insertDocuments callback
- // synchronously.
+ collectionCloner->join();
ASSERT_FALSE(collectionCloner->isActive());
- ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus());
+ ASSERT_EQUALS(ErrorCodes::UnknownError, getStatus());
}
-class CollectionClonerUUIDTest : public CollectionClonerTest {
+class CollectionClonerRenamedBeforeStartTest : public CollectionClonerTest {
protected:
- // The UUID tests should deal gracefully with renamed collections, so start the cloner with
- // an alternate name.
+ // The CollectionCloner should deal gracefully with collections renamed before the cloner
+ // was started, so start it with an alternate name.
const NamespaceString alternateNss{"db", "alternateCollName"};
- void startupWithUUID(int maxNumCloningCursors = 1) {
- collectionCloner.reset();
- options.uuid = UUID::gen();
- collectionCloner = stdx::make_unique<CollectionCloner>(&getExecutor(),
- dbWorkThreadPool.get(),
- target,
- alternateNss,
- options,
- setStatusCallback(),
- storageInterface.get(),
- defaultBatchSize);
-
- ASSERT_OK(collectionCloner->startup());
- }
-
- void testWithMaxNumCloningCursors(int maxNumCloningCursors, StringData cmdName) {
- startupWithUUID(maxNumCloningCursors);
-
- CollectionOptions actualOptions;
- CollectionMockStats stats;
- CollectionBulkLoaderMock* loader = new CollectionBulkLoaderMock(&stats);
- bool collectionCreated = false;
- storageInterface->createCollectionForBulkFn = [&](const NamespaceString& theNss,
- const CollectionOptions& theOptions,
- const BSONObj idIndexSpec,
- const std::vector<BSONObj>& theIndexSpecs)
- -> StatusWith<std::unique_ptr<CollectionBulkLoader>> {
- collectionCreated = true;
- actualOptions = theOptions;
- return std::unique_ptr<CollectionBulkLoader>(loader);
- };
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCountResponse(0));
- processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCreated);
-
- // Fetcher should be scheduled after cloner creates collection.
- auto net = getNet();
- executor::NetworkInterfaceMock::InNetworkGuard guard(net);
- ASSERT_TRUE(net->hasReadyRequests());
- NetworkOperationIterator noi = net->getNextReadyRequest();
- ASSERT_FALSE(net->hasReadyRequests());
- auto&& noiRequest = noi->getRequest();
- ASSERT_EQUALS(nss.db().toString(), noiRequest.dbname);
- ASSERT_BSONOBJ_EQ(actualOptions.toBSON(), options.toBSON());
-
- ASSERT_EQUALS(cmdName, std::string(noiRequest.cmdObj.firstElementFieldName()));
- ASSERT_EQUALS(cmdName == "find", noiRequest.cmdObj.getField("noCursorTimeout").trueValue());
- auto requestUUID = uassertStatusOK(UUID::parse(noiRequest.cmdObj.firstElement()));
- ASSERT_EQUALS(options.uuid.get(), requestUUID);
- }
+ const NamespaceString& getStartupNss() const override {
+ return alternateNss;
+ };
/**
* Sets up a test for the CollectionCloner that simulates the collection being dropped while
* copying the documents.
- * The ARM returns CursorNotFound error to indicate a collection drop. Subsequently, the
- * CollectionCloner should run a find command on the collection by UUID. This should be the next
- * ready request on in the network interface.
+ * The DBClientConnection returns a CursorNotFound error to indicate a collection drop.
*/
void setUpVerifyCollectionWasDroppedTest() {
- startupWithUUID();
-
+ // Pause the query so we can reliably wait for it to complete.
+ MockClientPauser pauser(_client);
+ // Return error response from the query.
+ _client->setFailureForQuery(
+ {ErrorCodes::CursorNotFound, "collection dropped while copying documents"});
+ ASSERT_OK(collectionCloner->startup());
{
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
processNetworkResponse(createCountResponse(0));
@@ -1378,24 +1249,10 @@ protected:
}
ASSERT_TRUE(collectionCloner->isActive());
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
+ _client->waitForPausedQuery();
ASSERT_TRUE(collectionStats.initCalled);
-
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(createCursorResponse(1, BSONArray()));
- }
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCloner->isActive());
-
- // Return error response to getMore command.
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
- processNetworkResponse(ErrorCodes::CursorNotFound,
- "collection dropped while copying documents");
- }
+ pauser.resume();
+ _client->waitForResumedQuery();
}
/**
@@ -1416,8 +1273,8 @@ protected:
}
};
-TEST_F(CollectionClonerUUIDTest, FirstRemoteCommandWithUUID) {
- startupWithUUID();
+TEST_F(CollectionClonerRenamedBeforeStartTest, FirstRemoteCommandWithRenamedCollection) {
+ ASSERT_OK(collectionCloner->startup());
auto net = getNet();
executor::NetworkInterfaceMock::InNetworkGuard guard(getNet());
@@ -1433,9 +1290,7 @@ TEST_F(CollectionClonerUUIDTest, FirstRemoteCommandWithUUID) {
ASSERT_TRUE(collectionCloner->isActive());
}
-TEST_F(CollectionClonerUUIDTest, BeginCollectionWithUUID) {
- startupWithUUID();
-
+TEST_F(CollectionClonerRenamedBeforeStartTest, BeginCollectionWithUUID) {
CollectionMockStats stats;
CollectionBulkLoaderMock* loader = new CollectionBulkLoaderMock(&stats);
NamespaceString collNss;
@@ -1454,6 +1309,11 @@ TEST_F(CollectionClonerUUIDTest, BeginCollectionWithUUID) {
return std::unique_ptr<CollectionBulkLoader>(loader);
};
+ // Pause the client so the cloner stops in the fetcher.
+ MockClientPauser pauser(_client);
+
+ ASSERT_OK(collectionCloner->startup());
+
// Split listIndexes response into 2 batches: first batch contains idIndexSpec and
// second batch contains specs. We expect the collection cloner to fix up the collection names
// (here from 'nss' to 'alternateNss') in the index specs, as the collection with the given UUID
@@ -1506,20 +1366,16 @@ TEST_F(CollectionClonerUUIDTest, BeginCollectionWithUUID) {
ASSERT_TRUE(collectionCloner->isActive());
}
-TEST_F(CollectionClonerUUIDTest, SingleCloningCursorWithUUIDUsesFindCommand) {
- // With a single cloning cursor, expect a find command.
- testWithMaxNumCloningCursors(1, "find");
-}
-
/**
* Start cloning.
- * While copying collection, simulate a collection drop by having the ARM return a CursorNotFound
- * error.
+ * While copying collection, simulate a collection drop by having the DBClientConnection return a
+ * CursorNotFound error.
* The CollectionCloner should run a find command on the collection by UUID.
* Simulate successful find command with a drop-pending namespace in the response.
* The CollectionCloner should complete with a successful final status.
*/
-TEST_F(CollectionClonerUUIDTest, CloningIsSuccessfulIfCollectionWasDroppedWhileCopyingDocuments) {
+TEST_F(CollectionClonerRenamedBeforeStartTest,
+ CloningIsSuccessfulIfCollectionWasDroppedWhileCopyingDocuments) {
setUpVerifyCollectionWasDroppedTest();
// CollectionCloner should send a find command with the collection's UUID.
@@ -1544,14 +1400,13 @@ TEST_F(CollectionClonerUUIDTest, CloningIsSuccessfulIfCollectionWasDroppedWhileC
/**
* Start cloning.
- * While copying collection, simulate a collection drop by having the ARM return a CursorNotFound
- * error.
+ * While copying collection, simulate a collection drop by having the DBClientConnection return a
+ * CursorNotFound error.
* The CollectionCloner should run a find command on the collection by UUID.
* Shut the CollectionCloner down.
- * The CollectionCloner should return a CursorNotFound final status which is the last error from the
- * ARM.
+ * The CollectionCloner should return a CursorNotFound final status.
*/
-TEST_F(CollectionClonerUUIDTest,
+TEST_F(CollectionClonerRenamedBeforeStartTest,
ShuttingDownCollectionClonerDuringCollectionDropVerificationReturnsCallbackCanceled) {
setUpVerifyCollectionWasDroppedTest();