summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/collection_cloner_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/collection_cloner_test.cpp')
-rw-r--r--src/mongo/db/repl/collection_cloner_test.cpp745
1 files changed, 374 insertions, 371 deletions
diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp
index 4a6c4d2bc03..8e41238ff7c 100644
--- a/src/mongo/db/repl/collection_cloner_test.cpp
+++ b/src/mongo/db/repl/collection_cloner_test.cpp
@@ -39,451 +39,454 @@
namespace {
- using namespace mongo;
- using namespace mongo::repl;
-
- class CollectionClonerTest : public BaseClonerTest {
- public:
-
- BaseCloner* getCloner() const override;
-
- protected:
-
- void setUp() override;
- void tearDown() override;
-
- CollectionOptions options;
- std::unique_ptr<CollectionCloner> collectionCloner;
- };
-
- void CollectionClonerTest::setUp() {
- BaseClonerTest::setUp();
- options.reset();
- options.storageEngine = BSON("storageEngine1" << BSONObj());
- collectionCloner.reset(new CollectionCloner(&getExecutor(), target, nss, options,
- stdx::bind(&CollectionClonerTest::setStatus,
- this,
- stdx::placeholders::_1),
- storageInterface.get()));
+using namespace mongo;
+using namespace mongo::repl;
+
+class CollectionClonerTest : public BaseClonerTest {
+public:
+ BaseCloner* getCloner() const override;
+
+protected:
+ void setUp() override;
+ void tearDown() override;
+
+ CollectionOptions options;
+ std::unique_ptr<CollectionCloner> collectionCloner;
+};
+
+void CollectionClonerTest::setUp() {
+ BaseClonerTest::setUp();
+ options.reset();
+ options.storageEngine = BSON("storageEngine1" << BSONObj());
+ collectionCloner.reset(new CollectionCloner(
+ &getExecutor(),
+ target,
+ nss,
+ options,
+ stdx::bind(&CollectionClonerTest::setStatus, this, stdx::placeholders::_1),
+ storageInterface.get()));
+}
+
+void CollectionClonerTest::tearDown() {
+ BaseClonerTest::tearDown();
+ // Executor may still invoke collection cloner's callback before shutting down.
+ collectionCloner.reset();
+ options.reset();
+}
+
+BaseCloner* CollectionClonerTest::getCloner() const {
+ return collectionCloner.get();
+}
+
+TEST_F(CollectionClonerTest, InvalidConstruction) {
+ ReplicationExecutor& executor = getExecutor();
+
+ const auto& cb = [](const Status&) { FAIL("should not reach here"); };
+
+ // Null executor.
+ {
+ CollectionCloner::StorageInterface* si = storageInterface.get();
+ ASSERT_THROWS(CollectionCloner(nullptr, target, nss, options, cb, si), UserException);
}
- void CollectionClonerTest::tearDown() {
- BaseClonerTest::tearDown();
- // Executor may still invoke collection cloner's callback before shutting down.
- collectionCloner.reset();
- options.reset();
- }
+ // Null storage interface
+ ASSERT_THROWS(CollectionCloner(&executor, target, nss, options, cb, nullptr), UserException);
- BaseCloner* CollectionClonerTest::getCloner() const {
- return collectionCloner.get();
+ // Invalid namespace.
+ {
+ NamespaceString badNss("db.");
+ CollectionCloner::StorageInterface* si = storageInterface.get();
+ ASSERT_THROWS(CollectionCloner(&executor, target, badNss, options, cb, si), UserException);
}
- TEST_F(CollectionClonerTest, InvalidConstruction) {
- ReplicationExecutor& executor = getExecutor();
-
- const auto& cb = [](const Status&) { FAIL("should not reach here"); };
-
- // Null executor.
- {
- CollectionCloner::StorageInterface* si = storageInterface.get();
- ASSERT_THROWS(CollectionCloner(nullptr, target, nss, options, cb, si), UserException);
- }
-
- // Null storage interface
- ASSERT_THROWS(CollectionCloner(&executor, target, nss, options, cb, nullptr),
+ // Invalid collection options.
+ {
+ CollectionOptions invalidOptions;
+ invalidOptions.storageEngine = BSON("storageEngine1"
+ << "not a document");
+ CollectionCloner::StorageInterface* si = storageInterface.get();
+ ASSERT_THROWS(CollectionCloner(&executor, target, nss, invalidOptions, cb, si),
UserException);
-
- // Invalid namespace.
- {
- NamespaceString badNss("db.");
- CollectionCloner::StorageInterface* si = storageInterface.get();
- ASSERT_THROWS(CollectionCloner(&executor, target, badNss, options, cb, si),
- UserException);
- }
-
- // Invalid collection options.
- {
- CollectionOptions invalidOptions;
- invalidOptions.storageEngine = BSON("storageEngine1" << "not a document");
- CollectionCloner::StorageInterface* si = storageInterface.get();
- ASSERT_THROWS(CollectionCloner(&executor, target, nss, invalidOptions, cb, si),
- UserException);
- }
-
- // Callback function cannot be null.
- {
- CollectionCloner::CallbackFn nullCb;
- CollectionCloner::StorageInterface* si = storageInterface.get();
- ASSERT_THROWS(CollectionCloner(&executor, target, nss, options, nullCb, si),
- UserException);
- }
- }
-
- TEST_F(CollectionClonerTest, ClonerLifeCycle) {
- testLifeCycle();
- }
-
- TEST_F(CollectionClonerTest, FirstRemoteCommand) {
- ASSERT_OK(collectionCloner->start());
-
- auto net = getNet();
- ASSERT_TRUE(net->hasReadyRequests());
- NetworkOperationIterator noi = net->getNextReadyRequest();
- auto&& noiRequest = noi->getRequest();
- ASSERT_EQUALS(nss.db().toString(), noiRequest.dbname);
- ASSERT_EQUALS("listIndexes", std::string(noiRequest.cmdObj.firstElementFieldName()));
- ASSERT_EQUALS(nss.coll().toString(), noiRequest.cmdObj.firstElement().valuestrsafe());
- ASSERT_FALSE(net->hasReadyRequests());
- ASSERT_TRUE(collectionCloner->isActive());
}
- TEST_F(CollectionClonerTest, RemoteCollectionMissing) {
- ASSERT_OK(collectionCloner->start());
-
- processNetworkResponse(
- BSON("ok" << 0 << "errmsg" << "" << "code" << ErrorCodes::NamespaceNotFound));
-
- ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, getStatus().code());
- ASSERT_FALSE(collectionCloner->isActive());
+ // Callback function cannot be null.
+ {
+ CollectionCloner::CallbackFn nullCb;
+ CollectionCloner::StorageInterface* si = storageInterface.get();
+ ASSERT_THROWS(CollectionCloner(&executor, target, nss, options, nullCb, si), UserException);
}
+}
+
+TEST_F(CollectionClonerTest, ClonerLifeCycle) {
+ testLifeCycle();
+}
+
+TEST_F(CollectionClonerTest, FirstRemoteCommand) {
+ ASSERT_OK(collectionCloner->start());
+
+ auto net = getNet();
+ ASSERT_TRUE(net->hasReadyRequests());
+ NetworkOperationIterator noi = net->getNextReadyRequest();
+ auto&& noiRequest = noi->getRequest();
+ ASSERT_EQUALS(nss.db().toString(), noiRequest.dbname);
+ ASSERT_EQUALS("listIndexes", std::string(noiRequest.cmdObj.firstElementFieldName()));
+ ASSERT_EQUALS(nss.coll().toString(), noiRequest.cmdObj.firstElement().valuestrsafe());
+ ASSERT_FALSE(net->hasReadyRequests());
+ ASSERT_TRUE(collectionCloner->isActive());
+}
+
+TEST_F(CollectionClonerTest, RemoteCollectionMissing) {
+ ASSERT_OK(collectionCloner->start());
+
+ processNetworkResponse(BSON("ok" << 0 << "errmsg"
+ << ""
+ << "code" << ErrorCodes::NamespaceNotFound));
+
+ ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, getStatus().code());
+ ASSERT_FALSE(collectionCloner->isActive());
+}
+
+// A collection may have no indexes. The cloner will produce a warning but
+// will still proceed with cloning.
+TEST_F(CollectionClonerTest, ListIndexesReturnedNoIndexes) {
+ ASSERT_OK(collectionCloner->start());
+
+ // Using a non-zero cursor to ensure that
+ // the cloner stops the fetcher from retrieving more results.
+ processNetworkResponse(createListIndexesResponse(1, BSONArray()));
+
+ ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
+ ASSERT_TRUE(collectionCloner->isActive());
+
+ ASSERT_TRUE(getNet()->hasReadyRequests());
+}
+
+TEST_F(CollectionClonerTest, BeginCollectionScheduleDbWorkFailed) {
+ ASSERT_OK(collectionCloner->start());
+
+ // Replace scheduleDbWork function so that cloner will fail to schedule DB work after
+ // getting index specs.
+ collectionCloner->setScheduleDbWorkFn([](const ReplicationExecutor::CallbackFn& workFn) {
+ return StatusWith<ReplicationExecutor::CallbackHandle>(ErrorCodes::UnknownError, "");
+ });
+
+ processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
+
+ ASSERT_EQUALS(ErrorCodes::UnknownError, getStatus().code());
+ ASSERT_FALSE(collectionCloner->isActive());
+}
+
+TEST_F(CollectionClonerTest, BeginCollectionCallbackCanceled) {
+ ASSERT_OK(collectionCloner->start());
+
+ // Replace scheduleDbWork function so that the callback for beginCollection is canceled
+ // immediately after scheduling.
+ auto&& executor = getExecutor();
+ collectionCloner->setScheduleDbWorkFn([&](const ReplicationExecutor::CallbackFn& workFn) {
+ // Schedule as non-exclusive task to allow us to cancel it before the executor is able
+ // to invoke the callback.
+ auto scheduleResult = executor.scheduleWork(workFn);
+ ASSERT_OK(scheduleResult.getStatus());
+ executor.cancel(scheduleResult.getValue());
+ return scheduleResult;
+ });
+
+ processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
+
+ collectionCloner->waitForDbWorker();
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, getStatus().code());
+ ASSERT_FALSE(collectionCloner->isActive());
+}
+
+TEST_F(CollectionClonerTest, BeginCollectionFailed) {
+ ASSERT_OK(collectionCloner->start());
+
+ storageInterface->beginCollectionFn = [&](OperationContext* txn,
+ const NamespaceString& theNss,
+ const CollectionOptions& theOptions,
+ const std::vector<BSONObj>& theIndexSpecs) {
+ return Status(ErrorCodes::OperationFailed, "");
+ };
- // A collection may have no indexes. The cloner will produce a warning but
- // will still proceed with cloning.
- TEST_F(CollectionClonerTest, ListIndexesReturnedNoIndexes) {
- ASSERT_OK(collectionCloner->start());
-
- // Using a non-zero cursor to ensure that
- // the cloner stops the fetcher from retrieving more results.
- processNetworkResponse(createListIndexesResponse(1, BSONArray()));
+ processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
+
+ collectionCloner->waitForDbWorker();
+
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus().code());
+ ASSERT_FALSE(collectionCloner->isActive());
+}
+
+TEST_F(CollectionClonerTest, BeginCollection) {
+ ASSERT_OK(collectionCloner->start());
+
+ NamespaceString collNss;
+ CollectionOptions collOptions;
+ std::vector<BSONObj> collIndexSpecs;
+ storageInterface->beginCollectionFn = [&](OperationContext* txn,
+ const NamespaceString& theNss,
+ const CollectionOptions& theOptions,
+ const std::vector<BSONObj>& theIndexSpecs) {
+ ASSERT(txn);
+ collNss = theNss;
+ collOptions = theOptions;
+ collIndexSpecs = theIndexSpecs;
+ return Status::OK();
+ };
- ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
- ASSERT_TRUE(collectionCloner->isActive());
+ // Split listIndexes response into 2 batches: first batch contains specs[0] and specs[1];
+ // second batch contains specs[2]
+ const std::vector<BSONObj> specs = {idIndexSpec,
+ BSON("v" << 1 << "key" << BSON("a" << 1) << "name"
+ << "a_1"
+ << "ns" << nss.ns()),
+ BSON("v" << 1 << "key" << BSON("b" << 1) << "name"
+ << "b_1"
+ << "ns" << nss.ns())};
- ASSERT_TRUE(getNet()->hasReadyRequests());
- }
+ processNetworkResponse(createListIndexesResponse(1, BSON_ARRAY(specs[0] << specs[1])));
- TEST_F(CollectionClonerTest, BeginCollectionScheduleDbWorkFailed) {
- ASSERT_OK(collectionCloner->start());
+ // 'status' should not be modified because cloning is not finished.
+ ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
+ ASSERT_TRUE(collectionCloner->isActive());
- // Replace scheduleDbWork function so that cloner will fail to schedule DB work after
- // getting index specs.
- collectionCloner->setScheduleDbWorkFn([](const ReplicationExecutor::CallbackFn& workFn) {
- return StatusWith<ReplicationExecutor::CallbackHandle>(ErrorCodes::UnknownError, "");
- });
+ processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(specs[2]), "nextBatch"));
- processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
+ collectionCloner->waitForDbWorker();
- ASSERT_EQUALS(ErrorCodes::UnknownError, getStatus().code());
- ASSERT_FALSE(collectionCloner->isActive());
- }
+ // 'status' will be set if listIndexes fails.
+ ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
- TEST_F(CollectionClonerTest, BeginCollectionCallbackCanceled) {
- ASSERT_OK(collectionCloner->start());
-
- // Replace scheduleDbWork function so that the callback for beginCollection is canceled
- // immediately after scheduling.
- auto&& executor = getExecutor();
- collectionCloner->setScheduleDbWorkFn([&](const ReplicationExecutor::CallbackFn& workFn) {
- // Schedule as non-exclusive task to allow us to cancel it before the executor is able
- // to invoke the callback.
- auto scheduleResult = executor.scheduleWork(workFn);
- ASSERT_OK(scheduleResult.getStatus());
- executor.cancel(scheduleResult.getValue());
- return scheduleResult;
- });
-
- processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
-
- collectionCloner->waitForDbWorker();
- ASSERT_EQUALS(ErrorCodes::CallbackCanceled, getStatus().code());
- ASSERT_FALSE(collectionCloner->isActive());
+ ASSERT_EQUALS(nss.ns(), collNss.ns());
+ ASSERT_EQUALS(options.toBSON(), collOptions.toBSON());
+ ASSERT_EQUALS(specs.size(), collIndexSpecs.size());
+ for (std::vector<BSONObj>::size_type i = 0; i < specs.size(); ++i) {
+ ASSERT_EQUALS(specs[i], collIndexSpecs[i]);
}
- TEST_F(CollectionClonerTest, BeginCollectionFailed) {
- ASSERT_OK(collectionCloner->start());
+ // Cloner is still active because it has to read the documents from the source collection.
+ ASSERT_TRUE(collectionCloner->isActive());
+}
+
+TEST_F(CollectionClonerTest, FindFetcherScheduleFailed) {
+ ASSERT_OK(collectionCloner->start());
+
+ // Shut down executor while in beginCollection callback.
+ // This will cause the fetcher to fail to schedule the find command.
+ bool collectionCreated = false;
+ storageInterface->beginCollectionFn = [&](OperationContext* txn,
+ const NamespaceString& theNss,
+ const CollectionOptions& theOptions,
+ const std::vector<BSONObj>& theIndexSpecs) {
+ collectionCreated = true;
+ getExecutor().shutdown();
+ return Status::OK();
+ };
- storageInterface->beginCollectionFn = [&](OperationContext* txn,
- const NamespaceString& theNss,
- const CollectionOptions& theOptions,
- const std::vector<BSONObj>& theIndexSpecs) {
- return Status(ErrorCodes::OperationFailed, "");
- };
+ processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
- processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
+ collectionCloner->waitForDbWorker();
+ ASSERT_TRUE(collectionCreated);
- collectionCloner->waitForDbWorker();
+ ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, getStatus().code());
+ ASSERT_FALSE(collectionCloner->isActive());
+}
- ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus().code());
- ASSERT_FALSE(collectionCloner->isActive());
- }
+TEST_F(CollectionClonerTest, FindCommandAfterBeginCollection) {
+ ASSERT_OK(collectionCloner->start());
- TEST_F(CollectionClonerTest, BeginCollection) {
- ASSERT_OK(collectionCloner->start());
-
- NamespaceString collNss;
- CollectionOptions collOptions;
- std::vector<BSONObj> collIndexSpecs;
- storageInterface->beginCollectionFn = [&](OperationContext* txn,
- const NamespaceString& theNss,
- const CollectionOptions& theOptions,
- const std::vector<BSONObj>& theIndexSpecs) {
- ASSERT(txn);
- collNss = theNss;
- collOptions = theOptions;
- collIndexSpecs = theIndexSpecs;
- return Status::OK();
- };
-
- // Split listIndexes response into 2 batches: first batch contains specs[0] and specs[1];
- // second batch contains specs[2]
- const std::vector<BSONObj> specs = {
- idIndexSpec,
- BSON("v" << 1 << "key" << BSON("a" << 1) << "name" << "a_1" << "ns" << nss.ns()),
- BSON("v" << 1 << "key" << BSON("b" << 1) << "name" << "b_1" << "ns" << nss.ns())};
-
- processNetworkResponse(createListIndexesResponse(1, BSON_ARRAY(specs[0] << specs[1])));
-
- // 'status' should not be modified because cloning is not finished.
- ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
- ASSERT_TRUE(collectionCloner->isActive());
-
- processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(specs[2]), "nextBatch"));
-
- collectionCloner->waitForDbWorker();
-
- // 'status' will be set if listIndexes fails.
- ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
-
- ASSERT_EQUALS(nss.ns(), collNss.ns());
- ASSERT_EQUALS(options.toBSON(), collOptions.toBSON());
- ASSERT_EQUALS(specs.size(), collIndexSpecs.size());
- for (std::vector<BSONObj>::size_type i = 0; i < specs.size(); ++i) {
- ASSERT_EQUALS(specs[i], collIndexSpecs[i]);
- }
-
- // Cloner is still active because it has to read the documents from the source collection.
- ASSERT_TRUE(collectionCloner->isActive());
- }
+ bool collectionCreated = false;
+ storageInterface->beginCollectionFn = [&](OperationContext* txn,
+ const NamespaceString& theNss,
+ const CollectionOptions& theOptions,
+ const std::vector<BSONObj>& theIndexSpecs) {
+ collectionCreated = true;
+ return Status::OK();
+ };
- TEST_F(CollectionClonerTest, FindFetcherScheduleFailed) {
- ASSERT_OK(collectionCloner->start());
+ processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
- // Shut down executor while in beginCollection callback.
- // This will cause the fetcher to fail to schedule the find command.
- bool collectionCreated = false;
- storageInterface->beginCollectionFn = [&](OperationContext* txn,
- const NamespaceString& theNss,
- const CollectionOptions& theOptions,
- const std::vector<BSONObj>& theIndexSpecs) {
- collectionCreated = true;
- getExecutor().shutdown();
- return Status::OK();
- };
+ collectionCloner->waitForDbWorker();
+ ASSERT_TRUE(collectionCreated);
- processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
+ // Fetcher should be scheduled after cloner creates collection.
+ auto net = getNet();
+ ASSERT_TRUE(net->hasReadyRequests());
+ NetworkOperationIterator noi = net->getNextReadyRequest();
+ auto&& noiRequest = noi->getRequest();
+ ASSERT_EQUALS(nss.db().toString(), noiRequest.dbname);
+ ASSERT_EQUALS("find", std::string(noiRequest.cmdObj.firstElementFieldName()));
+ ASSERT_EQUALS(nss.coll().toString(), noiRequest.cmdObj.firstElement().valuestrsafe());
+ ASSERT_TRUE(noiRequest.cmdObj.getField("noCursorTimeout").trueValue());
+ ASSERT_FALSE(net->hasReadyRequests());
+}
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCreated);
+TEST_F(CollectionClonerTest, FindCommandFailed) {
+ ASSERT_OK(collectionCloner->start());
- ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, getStatus().code());
- ASSERT_FALSE(collectionCloner->isActive());
- }
+ processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
- TEST_F(CollectionClonerTest, FindCommandAfterBeginCollection) {
- ASSERT_OK(collectionCloner->start());
-
- bool collectionCreated = false;
- storageInterface->beginCollectionFn = [&](OperationContext* txn,
- const NamespaceString& theNss,
- const CollectionOptions& theOptions,
- const std::vector<BSONObj>& theIndexSpecs) {
- collectionCreated = true;
- return Status::OK();
- };
-
- processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
-
- collectionCloner->waitForDbWorker();
- ASSERT_TRUE(collectionCreated);
-
- // Fetcher should be scheduled after cloner creates collection.
- auto net = getNet();
- ASSERT_TRUE(net->hasReadyRequests());
- NetworkOperationIterator noi = net->getNextReadyRequest();
- auto&& noiRequest = noi->getRequest();
- ASSERT_EQUALS(nss.db().toString(), noiRequest.dbname);
- ASSERT_EQUALS("find", std::string(noiRequest.cmdObj.firstElementFieldName()));
- ASSERT_EQUALS(nss.coll().toString(), noiRequest.cmdObj.firstElement().valuestrsafe());
- ASSERT_TRUE(noiRequest.cmdObj.getField("noCursorTimeout").trueValue());
- ASSERT_FALSE(net->hasReadyRequests());
- }
+ collectionCloner->waitForDbWorker();
- TEST_F(CollectionClonerTest, FindCommandFailed) {
- ASSERT_OK(collectionCloner->start());
+ processNetworkResponse(BSON("ok" << 0 << "errmsg"
+ << ""
+ << "code" << ErrorCodes::CursorNotFound));
- processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
+ ASSERT_EQUALS(ErrorCodes::CursorNotFound, getStatus().code());
+ ASSERT_FALSE(collectionCloner->isActive());
+}
- collectionCloner->waitForDbWorker();
+TEST_F(CollectionClonerTest, FindCommandCanceled) {
+ ASSERT_OK(collectionCloner->start());
- processNetworkResponse(
- BSON("ok" << 0 << "errmsg" << "" << "code" << ErrorCodes::CursorNotFound));
+ scheduleNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
- ASSERT_EQUALS(ErrorCodes::CursorNotFound, getStatus().code());
- ASSERT_FALSE(collectionCloner->isActive());
- }
+ auto net = getNet();
+ net->runReadyNetworkOperations();
- TEST_F(CollectionClonerTest, FindCommandCanceled) {
- ASSERT_OK(collectionCloner->start());
+ collectionCloner->waitForDbWorker();
- scheduleNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
+ scheduleNetworkResponse(BSON("ok" << 1));
- auto net = getNet();
- net->runReadyNetworkOperations();
+ collectionCloner->cancel();
- collectionCloner->waitForDbWorker();
+ net->runReadyNetworkOperations();
- scheduleNetworkResponse(BSON("ok" << 1));
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, getStatus().code());
+ ASSERT_FALSE(collectionCloner->isActive());
+}
- collectionCloner->cancel();
+TEST_F(CollectionClonerTest, InsertDocumentsScheduleDbWorkFailed) {
+ ASSERT_OK(collectionCloner->start());
- net->runReadyNetworkOperations();
+ processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
- ASSERT_EQUALS(ErrorCodes::CallbackCanceled, getStatus().code());
- ASSERT_FALSE(collectionCloner->isActive());
- }
+ collectionCloner->waitForDbWorker();
- TEST_F(CollectionClonerTest, InsertDocumentsScheduleDbWorkFailed) {
- ASSERT_OK(collectionCloner->start());
+ // Replace scheduleDbWork function so that cloner will fail to schedule DB work after
+ // getting documents.
+ collectionCloner->setScheduleDbWorkFn([](const ReplicationExecutor::CallbackFn& workFn) {
+ return StatusWith<ReplicationExecutor::CallbackHandle>(ErrorCodes::UnknownError, "");
+ });
- processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
+ const BSONObj doc = BSON("_id" << 1);
+ processNetworkResponse(createCursorResponse(0, BSON_ARRAY(doc)));
- collectionCloner->waitForDbWorker();
+ ASSERT_EQUALS(ErrorCodes::UnknownError, getStatus().code());
+ ASSERT_FALSE(collectionCloner->isActive());
+}
- // Replace scheduleDbWork function so that cloner will fail to schedule DB work after
- // getting documents.
- collectionCloner->setScheduleDbWorkFn([](const ReplicationExecutor::CallbackFn& workFn) {
- return StatusWith<ReplicationExecutor::CallbackHandle>(ErrorCodes::UnknownError, "");
- });
+TEST_F(CollectionClonerTest, InsertDocumentsCallbackCanceled) {
+ ASSERT_OK(collectionCloner->start());
- const BSONObj doc = BSON("_id" << 1);
- processNetworkResponse(createCursorResponse(0, BSON_ARRAY(doc)));
+ processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
- ASSERT_EQUALS(ErrorCodes::UnknownError, getStatus().code());
- ASSERT_FALSE(collectionCloner->isActive());
- }
+ collectionCloner->waitForDbWorker();
- TEST_F(CollectionClonerTest, InsertDocumentsCallbackCanceled) {
- ASSERT_OK(collectionCloner->start());
+ // Replace scheduleDbWork function so that the callback for insertDocuments is canceled
+ // immediately after scheduling.
+ auto&& executor = getExecutor();
+ collectionCloner->setScheduleDbWorkFn([&](const ReplicationExecutor::CallbackFn& workFn) {
+ // Schedule as non-exclusive task to allow us to cancel it before the executor is able
+ // to invoke the callback.
+ auto scheduleResult = executor.scheduleWork(workFn);
+ ASSERT_OK(scheduleResult.getStatus());
+ executor.cancel(scheduleResult.getValue());
+ return scheduleResult;
+ });
- processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
+ const BSONObj doc = BSON("_id" << 1);
+ processNetworkResponse(createCursorResponse(0, BSON_ARRAY(doc)));
- collectionCloner->waitForDbWorker();
+ collectionCloner->waitForDbWorker();
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, getStatus().code());
+ ASSERT_FALSE(collectionCloner->isActive());
+}
- // Replace scheduleDbWork function so that the callback for insertDocuments is canceled
- // immediately after scheduling.
- auto&& executor = getExecutor();
- collectionCloner->setScheduleDbWorkFn([&](const ReplicationExecutor::CallbackFn& workFn) {
- // Schedule as non-exclusive task to allow us to cancel it before the executor is able
- // to invoke the callback.
- auto scheduleResult = executor.scheduleWork(workFn);
- ASSERT_OK(scheduleResult.getStatus());
- executor.cancel(scheduleResult.getValue());
- return scheduleResult;
- });
+TEST_F(CollectionClonerTest, InsertDocumentsFailed) {
+ ASSERT_OK(collectionCloner->start());
- const BSONObj doc = BSON("_id" << 1);
- processNetworkResponse(createCursorResponse(0, BSON_ARRAY(doc)));
-
- collectionCloner->waitForDbWorker();
- ASSERT_EQUALS(ErrorCodes::CallbackCanceled, getStatus().code());
- ASSERT_FALSE(collectionCloner->isActive());
- }
-
- TEST_F(CollectionClonerTest, InsertDocumentsFailed) {
- ASSERT_OK(collectionCloner->start());
-
- bool insertDocumentsCalled = false;
- storageInterface->insertDocumentsFn = [&](OperationContext* txn,
- const NamespaceString& theNss,
- const std::vector<BSONObj>& theDocuments) {
- insertDocumentsCalled = true;
- return Status(ErrorCodes::OperationFailed, "");
- };
+ bool insertDocumentsCalled = false;
+ storageInterface->insertDocumentsFn = [&](OperationContext* txn,
+ const NamespaceString& theNss,
+ const std::vector<BSONObj>& theDocuments) {
+ insertDocumentsCalled = true;
+ return Status(ErrorCodes::OperationFailed, "");
+ };
- processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
+ processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
- collectionCloner->waitForDbWorker();
+ collectionCloner->waitForDbWorker();
- processNetworkResponse(createCursorResponse(0, BSONArray()));
+ processNetworkResponse(createCursorResponse(0, BSONArray()));
- collectionCloner->wait();
+ collectionCloner->wait();
- ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus().code());
- ASSERT_FALSE(collectionCloner->isActive());
- }
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus().code());
+ ASSERT_FALSE(collectionCloner->isActive());
+}
- TEST_F(CollectionClonerTest, InsertDocumentsSingleBatch) {
- ASSERT_OK(collectionCloner->start());
+TEST_F(CollectionClonerTest, InsertDocumentsSingleBatch) {
+ ASSERT_OK(collectionCloner->start());
- std::vector<BSONObj> collDocuments;
- storageInterface->insertDocumentsFn = [&](OperationContext* txn,
- const NamespaceString& theNss,
- const std::vector<BSONObj>& theDocuments) {
- ASSERT(txn);
- collDocuments = theDocuments;
- return Status::OK();
- };
+ std::vector<BSONObj> collDocuments;
+ storageInterface->insertDocumentsFn = [&](OperationContext* txn,
+ const NamespaceString& theNss,
+ const std::vector<BSONObj>& theDocuments) {
+ ASSERT(txn);
+ collDocuments = theDocuments;
+ return Status::OK();
+ };
- processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
+ processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
- collectionCloner->waitForDbWorker();
+ collectionCloner->waitForDbWorker();
- const BSONObj doc = BSON("_id" << 1);
- processNetworkResponse(createCursorResponse(0, BSON_ARRAY(doc)));
+ const BSONObj doc = BSON("_id" << 1);
+ processNetworkResponse(createCursorResponse(0, BSON_ARRAY(doc)));
- collectionCloner->waitForDbWorker();
- ASSERT_EQUALS(1U, collDocuments.size());
- ASSERT_EQUALS(doc, collDocuments[0]);
+ collectionCloner->waitForDbWorker();
+ ASSERT_EQUALS(1U, collDocuments.size());
+ ASSERT_EQUALS(doc, collDocuments[0]);
- ASSERT_OK(getStatus());
- ASSERT_FALSE(collectionCloner->isActive());
- }
+ ASSERT_OK(getStatus());
+ ASSERT_FALSE(collectionCloner->isActive());
+}
- TEST_F(CollectionClonerTest, InsertDocumentsMultipleBatches) {
- ASSERT_OK(collectionCloner->start());
+TEST_F(CollectionClonerTest, InsertDocumentsMultipleBatches) {
+ ASSERT_OK(collectionCloner->start());
- std::vector<BSONObj> collDocuments;
- storageInterface->insertDocumentsFn = [&](OperationContext* txn,
- const NamespaceString& theNss,
- const std::vector<BSONObj>& theDocuments) {
- ASSERT(txn);
- collDocuments = theDocuments;
- return Status::OK();
- };
+ std::vector<BSONObj> collDocuments;
+ storageInterface->insertDocumentsFn = [&](OperationContext* txn,
+ const NamespaceString& theNss,
+ const std::vector<BSONObj>& theDocuments) {
+ ASSERT(txn);
+ collDocuments = theDocuments;
+ return Status::OK();
+ };
- processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
+ processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec)));
- collectionCloner->waitForDbWorker();
+ collectionCloner->waitForDbWorker();
- const BSONObj doc = BSON("_id" << 1);
- processNetworkResponse(createCursorResponse(1, BSON_ARRAY(doc)));
+ const BSONObj doc = BSON("_id" << 1);
+ processNetworkResponse(createCursorResponse(1, BSON_ARRAY(doc)));
- collectionCloner->waitForDbWorker();
- ASSERT_EQUALS(1U, collDocuments.size());
- ASSERT_EQUALS(doc, collDocuments[0]);
+ collectionCloner->waitForDbWorker();
+ ASSERT_EQUALS(1U, collDocuments.size());
+ ASSERT_EQUALS(doc, collDocuments[0]);
- ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
- ASSERT_TRUE(collectionCloner->isActive());
+ ASSERT_EQUALS(getDetectableErrorStatus(), getStatus());
+ ASSERT_TRUE(collectionCloner->isActive());
- const BSONObj doc2 = BSON("_id" << 1);
- processNetworkResponse(createCursorResponse(0, BSON_ARRAY(doc2), "nextBatch"));
+ const BSONObj doc2 = BSON("_id" << 1);
+ processNetworkResponse(createCursorResponse(0, BSON_ARRAY(doc2), "nextBatch"));
- collectionCloner->waitForDbWorker();
- ASSERT_EQUALS(1U, collDocuments.size());
- ASSERT_EQUALS(doc2, collDocuments[0]);
+ collectionCloner->waitForDbWorker();
+ ASSERT_EQUALS(1U, collDocuments.size());
+ ASSERT_EQUALS(doc2, collDocuments[0]);
- ASSERT_OK(getStatus());
- ASSERT_FALSE(collectionCloner->isActive());
- }
+ ASSERT_OK(getStatus());
+ ASSERT_FALSE(collectionCloner->isActive());
+}
-} // namespace
+} // namespace