/** * Copyright 2015 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/platform/basic.h" #include #include #include "mongo/db/commands.h" #include "mongo/db/jsobj.h" #include "mongo/db/repl/base_cloner_test_fixture.h" #include "mongo/db/repl/collection_cloner.h" #include "mongo/unittest/unittest.h" namespace { using namespace mongo; using namespace mongo::repl; class CollectionClonerTest : public BaseClonerTest { public: BaseCloner* getCloner() const override; protected: void setUp() override; void tearDown() override; CollectionOptions options; std::unique_ptr collectionCloner; }; void CollectionClonerTest::setUp() { BaseClonerTest::setUp(); options.reset(); options.storageEngine = BSON("storageEngine1" << BSONObj()); collectionCloner.reset(new CollectionCloner( &getReplExecutor(), target, nss, options, stdx::bind(&CollectionClonerTest::setStatus, this, stdx::placeholders::_1), storageInterface.get())); } void CollectionClonerTest::tearDown() { BaseClonerTest::tearDown(); // Executor may still invoke collection cloner's callback before shutting down. collectionCloner.reset(); options.reset(); } BaseCloner* CollectionClonerTest::getCloner() const { return collectionCloner.get(); } TEST_F(CollectionClonerTest, InvalidConstruction) { ReplicationExecutor& executor = getReplExecutor(); const auto& cb = [](const Status&) { FAIL("should not reach here"); }; // Null executor. { CollectionCloner::StorageInterface* si = storageInterface.get(); ASSERT_THROWS(CollectionCloner(nullptr, target, nss, options, cb, si), UserException); } // Null storage interface ASSERT_THROWS(CollectionCloner(&executor, target, nss, options, cb, nullptr), UserException); // Invalid namespace. { NamespaceString badNss("db."); CollectionCloner::StorageInterface* si = storageInterface.get(); ASSERT_THROWS(CollectionCloner(&executor, target, badNss, options, cb, si), UserException); } // Invalid collection options. { CollectionOptions invalidOptions; invalidOptions.storageEngine = BSON("storageEngine1" << "not a document"); CollectionCloner::StorageInterface* si = storageInterface.get(); ASSERT_THROWS(CollectionCloner(&executor, target, nss, invalidOptions, cb, si), UserException); } // Callback function cannot be null. { CollectionCloner::CallbackFn nullCb; CollectionCloner::StorageInterface* si = storageInterface.get(); ASSERT_THROWS(CollectionCloner(&executor, target, nss, options, nullCb, si), UserException); } } TEST_F(CollectionClonerTest, ClonerLifeCycle) { testLifeCycle(); } TEST_F(CollectionClonerTest, FirstRemoteCommand) { ASSERT_OK(collectionCloner->start()); auto net = getNet(); ASSERT_TRUE(net->hasReadyRequests()); NetworkOperationIterator noi = net->getNextReadyRequest(); auto&& noiRequest = noi->getRequest(); ASSERT_EQUALS(nss.db().toString(), noiRequest.dbname); ASSERT_EQUALS("listIndexes", std::string(noiRequest.cmdObj.firstElementFieldName())); ASSERT_EQUALS(nss.coll().toString(), noiRequest.cmdObj.firstElement().valuestrsafe()); ASSERT_FALSE(net->hasReadyRequests()); ASSERT_TRUE(collectionCloner->isActive()); } TEST_F(CollectionClonerTest, RemoteCollectionMissing) { ASSERT_OK(collectionCloner->start()); processNetworkResponse(BSON("ok" << 0 << "errmsg" << "" << "code" << ErrorCodes::NamespaceNotFound)); ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, getStatus().code()); ASSERT_FALSE(collectionCloner->isActive()); } // A collection may have no indexes. The cloner will produce a warning but // will still proceed with cloning. TEST_F(CollectionClonerTest, ListIndexesReturnedNoIndexes) { ASSERT_OK(collectionCloner->start()); // Using a non-zero cursor to ensure that // the cloner stops the fetcher from retrieving more results. processNetworkResponse(createListIndexesResponse(1, BSONArray())); ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); ASSERT_TRUE(collectionCloner->isActive()); ASSERT_TRUE(getNet()->hasReadyRequests()); } TEST_F(CollectionClonerTest, BeginCollectionScheduleDbWorkFailed) { ASSERT_OK(collectionCloner->start()); // Replace scheduleDbWork function so that cloner will fail to schedule DB work after // getting index specs. collectionCloner->setScheduleDbWorkFn([](const ReplicationExecutor::CallbackFn& workFn) { return StatusWith(ErrorCodes::UnknownError, ""); }); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); ASSERT_EQUALS(ErrorCodes::UnknownError, getStatus().code()); ASSERT_FALSE(collectionCloner->isActive()); } TEST_F(CollectionClonerTest, BeginCollectionCallbackCanceled) { ASSERT_OK(collectionCloner->start()); // Replace scheduleDbWork function so that the callback for beginCollection is canceled // immediately after scheduling. auto&& executor = getReplExecutor(); collectionCloner->setScheduleDbWorkFn([&](const ReplicationExecutor::CallbackFn& workFn) { // Schedule as non-exclusive task to allow us to cancel it before the executor is able // to invoke the callback. auto scheduleResult = executor.scheduleWork(workFn); ASSERT_OK(scheduleResult.getStatus()); executor.cancel(scheduleResult.getValue()); return scheduleResult; }); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); collectionCloner->waitForDbWorker(); ASSERT_EQUALS(ErrorCodes::CallbackCanceled, getStatus().code()); ASSERT_FALSE(collectionCloner->isActive()); } TEST_F(CollectionClonerTest, BeginCollectionFailed) { ASSERT_OK(collectionCloner->start()); storageInterface->beginCollectionFn = [&](OperationContext* txn, const NamespaceString& theNss, const CollectionOptions& theOptions, const std::vector& theIndexSpecs) { return Status(ErrorCodes::OperationFailed, ""); }; processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); collectionCloner->waitForDbWorker(); ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus().code()); ASSERT_FALSE(collectionCloner->isActive()); } TEST_F(CollectionClonerTest, BeginCollection) { ASSERT_OK(collectionCloner->start()); NamespaceString collNss; CollectionOptions collOptions; std::vector collIndexSpecs; storageInterface->beginCollectionFn = [&](OperationContext* txn, const NamespaceString& theNss, const CollectionOptions& theOptions, const std::vector& theIndexSpecs) { ASSERT(txn); collNss = theNss; collOptions = theOptions; collIndexSpecs = theIndexSpecs; return Status::OK(); }; // Split listIndexes response into 2 batches: first batch contains specs[0] and specs[1]; // second batch contains specs[2] const std::vector specs = {idIndexSpec, BSON("v" << 1 << "key" << BSON("a" << 1) << "name" << "a_1" << "ns" << nss.ns()), BSON("v" << 1 << "key" << BSON("b" << 1) << "name" << "b_1" << "ns" << nss.ns())}; processNetworkResponse(createListIndexesResponse(1, BSON_ARRAY(specs[0] << specs[1]))); // 'status' should not be modified because cloning is not finished. ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); ASSERT_TRUE(collectionCloner->isActive()); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(specs[2]), "nextBatch")); collectionCloner->waitForDbWorker(); // 'status' will be set if listIndexes fails. ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); ASSERT_EQUALS(nss.ns(), collNss.ns()); ASSERT_EQUALS(options.toBSON(), collOptions.toBSON()); ASSERT_EQUALS(specs.size(), collIndexSpecs.size()); for (std::vector::size_type i = 0; i < specs.size(); ++i) { ASSERT_EQUALS(specs[i], collIndexSpecs[i]); } // Cloner is still active because it has to read the documents from the source collection. ASSERT_TRUE(collectionCloner->isActive()); } TEST_F(CollectionClonerTest, FindFetcherScheduleFailed) { ASSERT_OK(collectionCloner->start()); // Shut down executor while in beginCollection callback. // This will cause the fetcher to fail to schedule the find command. bool collectionCreated = false; storageInterface->beginCollectionFn = [&](OperationContext* txn, const NamespaceString& theNss, const CollectionOptions& theOptions, const std::vector& theIndexSpecs) { collectionCreated = true; getExecutor().shutdown(); return Status::OK(); }; processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); collectionCloner->waitForDbWorker(); ASSERT_TRUE(collectionCreated); ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, getStatus().code()); ASSERT_FALSE(collectionCloner->isActive()); } TEST_F(CollectionClonerTest, FindCommandAfterBeginCollection) { ASSERT_OK(collectionCloner->start()); bool collectionCreated = false; storageInterface->beginCollectionFn = [&](OperationContext* txn, const NamespaceString& theNss, const CollectionOptions& theOptions, const std::vector& theIndexSpecs) { collectionCreated = true; return Status::OK(); }; processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); collectionCloner->waitForDbWorker(); ASSERT_TRUE(collectionCreated); // Fetcher should be scheduled after cloner creates collection. auto net = getNet(); ASSERT_TRUE(net->hasReadyRequests()); NetworkOperationIterator noi = net->getNextReadyRequest(); auto&& noiRequest = noi->getRequest(); ASSERT_EQUALS(nss.db().toString(), noiRequest.dbname); ASSERT_EQUALS("find", std::string(noiRequest.cmdObj.firstElementFieldName())); ASSERT_EQUALS(nss.coll().toString(), noiRequest.cmdObj.firstElement().valuestrsafe()); ASSERT_TRUE(noiRequest.cmdObj.getField("noCursorTimeout").trueValue()); ASSERT_FALSE(net->hasReadyRequests()); } TEST_F(CollectionClonerTest, FindCommandFailed) { ASSERT_OK(collectionCloner->start()); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); collectionCloner->waitForDbWorker(); processNetworkResponse(BSON("ok" << 0 << "errmsg" << "" << "code" << ErrorCodes::CursorNotFound)); ASSERT_EQUALS(ErrorCodes::CursorNotFound, getStatus().code()); ASSERT_FALSE(collectionCloner->isActive()); } TEST_F(CollectionClonerTest, FindCommandCanceled) { ASSERT_OK(collectionCloner->start()); scheduleNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); auto net = getNet(); net->runReadyNetworkOperations(); collectionCloner->waitForDbWorker(); scheduleNetworkResponse(BSON("ok" << 1)); collectionCloner->cancel(); net->runReadyNetworkOperations(); ASSERT_EQUALS(ErrorCodes::CallbackCanceled, getStatus().code()); ASSERT_FALSE(collectionCloner->isActive()); } TEST_F(CollectionClonerTest, InsertDocumentsScheduleDbWorkFailed) { ASSERT_OK(collectionCloner->start()); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); collectionCloner->waitForDbWorker(); // Replace scheduleDbWork function so that cloner will fail to schedule DB work after // getting documents. collectionCloner->setScheduleDbWorkFn([](const ReplicationExecutor::CallbackFn& workFn) { return StatusWith(ErrorCodes::UnknownError, ""); }); const BSONObj doc = BSON("_id" << 1); processNetworkResponse(createCursorResponse(0, BSON_ARRAY(doc))); ASSERT_EQUALS(ErrorCodes::UnknownError, getStatus().code()); ASSERT_FALSE(collectionCloner->isActive()); } TEST_F(CollectionClonerTest, InsertDocumentsCallbackCanceled) { ASSERT_OK(collectionCloner->start()); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); collectionCloner->waitForDbWorker(); // Replace scheduleDbWork function so that the callback for insertDocuments is canceled // immediately after scheduling. auto&& executor = getReplExecutor(); collectionCloner->setScheduleDbWorkFn([&](const ReplicationExecutor::CallbackFn& workFn) { // Schedule as non-exclusive task to allow us to cancel it before the executor is able // to invoke the callback. auto scheduleResult = executor.scheduleWork(workFn); ASSERT_OK(scheduleResult.getStatus()); executor.cancel(scheduleResult.getValue()); return scheduleResult; }); const BSONObj doc = BSON("_id" << 1); processNetworkResponse(createCursorResponse(0, BSON_ARRAY(doc))); collectionCloner->waitForDbWorker(); ASSERT_EQUALS(ErrorCodes::CallbackCanceled, getStatus().code()); ASSERT_FALSE(collectionCloner->isActive()); } TEST_F(CollectionClonerTest, InsertDocumentsFailed) { ASSERT_OK(collectionCloner->start()); bool insertDocumentsCalled = false; storageInterface->insertDocumentsFn = [&](OperationContext* txn, const NamespaceString& theNss, const std::vector& theDocuments) { insertDocumentsCalled = true; return Status(ErrorCodes::OperationFailed, ""); }; processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); collectionCloner->waitForDbWorker(); processNetworkResponse(createCursorResponse(0, BSONArray())); collectionCloner->wait(); ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus().code()); ASSERT_FALSE(collectionCloner->isActive()); } TEST_F(CollectionClonerTest, InsertDocumentsSingleBatch) { ASSERT_OK(collectionCloner->start()); std::vector collDocuments; storageInterface->insertDocumentsFn = [&](OperationContext* txn, const NamespaceString& theNss, const std::vector& theDocuments) { ASSERT(txn); collDocuments = theDocuments; return Status::OK(); }; processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); collectionCloner->waitForDbWorker(); const BSONObj doc = BSON("_id" << 1); processNetworkResponse(createCursorResponse(0, BSON_ARRAY(doc))); collectionCloner->waitForDbWorker(); ASSERT_EQUALS(1U, collDocuments.size()); ASSERT_EQUALS(doc, collDocuments[0]); ASSERT_OK(getStatus()); ASSERT_FALSE(collectionCloner->isActive()); } TEST_F(CollectionClonerTest, InsertDocumentsMultipleBatches) { ASSERT_OK(collectionCloner->start()); std::vector collDocuments; storageInterface->insertDocumentsFn = [&](OperationContext* txn, const NamespaceString& theNss, const std::vector& theDocuments) { ASSERT(txn); collDocuments = theDocuments; return Status::OK(); }; processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); collectionCloner->waitForDbWorker(); const BSONObj doc = BSON("_id" << 1); processNetworkResponse(createCursorResponse(1, BSON_ARRAY(doc))); collectionCloner->waitForDbWorker(); ASSERT_EQUALS(1U, collDocuments.size()); ASSERT_EQUALS(doc, collDocuments[0]); ASSERT_EQUALS(getDetectableErrorStatus(), getStatus()); ASSERT_TRUE(collectionCloner->isActive()); const BSONObj doc2 = BSON("_id" << 1); processNetworkResponse(createCursorResponse(0, BSON_ARRAY(doc2), "nextBatch")); collectionCloner->waitForDbWorker(); ASSERT_EQUALS(1U, collDocuments.size()); ASSERT_EQUALS(doc2, collDocuments[0]); ASSERT_OK(getStatus()); ASSERT_FALSE(collectionCloner->isActive()); } } // namespace