diff options
author | Benety Goh <benety@mongodb.com> | 2015-04-13 16:57:54 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2015-04-21 11:14:01 -0400 |
commit | 61dd7ef93ab33b2a379be1caaf07b244d67343fb (patch) | |
tree | e63d5c2dc6a712bc7be7acfdf8175b634dd2c858 | |
parent | 6b4074051eeafc3ebeb9d85828eb24d098e3192d (diff) | |
download | mongo-61dd7ef93ab33b2a379be1caaf07b244d67343fb.tar.gz |
SERVER-18015 SERVER-17894 data replication collection cloner cleanup
-rw-r--r-- | src/mongo/db/repl/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner.h | 57 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner_test.cpp | 252 |
4 files changed, 57 insertions, 265 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 280b31a2a4e..af38224af98 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -326,6 +326,6 @@ env.CppUnitTest( source='collection_cloner_test.cpp', LIBDEPS=[ 'collection_cloner', - 'replication_executor_test_fixture', + 'base_cloner_test_fixture', ], ) diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp index 56066be614b..3a89ece4d4b 100644 --- a/src/mongo/db/repl/collection_cloner.cpp +++ b/src/mongo/db/repl/collection_cloner.cpp @@ -83,11 +83,14 @@ namespace repl { uassert(ErrorCodes::BadValue, "null replication executor", executor); uassert(ErrorCodes::BadValue, "invalid collection namespace: " + sourceNss.ns(), sourceNss.isValid()); - uassert(ErrorCodes::BadValue, "null storage interface", storageInterface); uassertStatusOK(options.validate()); + uassert(ErrorCodes::BadValue, "callback function cannot be null", work); + uassert(ErrorCodes::BadValue, "null storage interface", storageInterface); } - CollectionCloner::~CollectionCloner() { } + const NamespaceString& CollectionCloner::getSourceNamespace() const { + return _sourceNss; + } std::string CollectionCloner::getDiagnosticString() const { boost::lock_guard<boost::mutex> lk(_mutex); @@ -172,7 +175,7 @@ namespace repl { } } - void CollectionCloner::setScheduleDbWorkFn(ScheduleDbWorkFn scheduleDbWorkFn) { + void CollectionCloner::setScheduleDbWorkFn(const ScheduleDbWorkFn& scheduleDbWorkFn) { boost::lock_guard<boost::mutex> lk(_mutex); _scheduleDbWorkFn = scheduleDbWorkFn; @@ -293,7 +296,5 @@ namespace repl { _work(Status::OK()); } - CollectionCloner::StorageInterface::~StorageInterface() { } - } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h index 5823e0bf96d..98d62ff527f 100644 --- a/src/mongo/db/repl/collection_cloner.h +++ b/src/mongo/db/repl/collection_cloner.h @@ -38,6 +38,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/db/catalog/collection_options.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/repl/base_cloner.h" #include "mongo/db/repl/fetcher.h" #include "mongo/db/repl/replication_executor.h" #include "mongo/stdx/functional.h" @@ -46,7 +47,7 @@ namespace mongo { namespace repl { - class CollectionCloner { + class CollectionCloner : public BaseCloner { MONGO_DISALLOW_COPYING(CollectionCloner); public: @@ -58,20 +59,14 @@ namespace repl { class StorageInterface; /** - * Fetcher callback function to report final status of collection cloning. - */ - typedef stdx::function<void (const Status&)> CallbackFn; - - /** * Type of function to schedule database work with the executor. * * Must be consistent with ReplicationExecutor::scheduleWorkWithGlobalExclusiveLock(). * * Used for testing only. */ - typedef stdx::function< - StatusWith<ReplicationExecutor::CallbackHandle> ( - const ReplicationExecutor::CallbackFn&)> ScheduleDbWorkFn; + using ScheduleDbWorkFn = stdx::function<StatusWith<ReplicationExecutor::CallbackHandle> ( + const ReplicationExecutor::CallbackFn&)>; /** * Creates CollectionCloner task in inactive state. Use start() to activate cloner. @@ -89,45 +84,23 @@ namespace repl { const CallbackFn& work, StorageInterface* storageInterface); - virtual ~CollectionCloner(); + virtual ~CollectionCloner() = default; - /** - * Returns diagnostic information. - */ - std::string getDiagnosticString() const; + const NamespaceString& getSourceNamespace() const; - /** - * Returns true if the cloner has been started (but has not completed). - */ - bool isActive() const; + std::string getDiagnosticString() const override; - /** - * Starts collection cloning by scheduling initial command to be run by the executor. - */ - Status start(); + bool isActive() const override; - /** - * Cancels current remote command request. - * Returns immediately if collection cloner is not active. - * - * If the cloner is canceled after start() has been called, '_work' will be invoked - * with a ErrorCodes::CallbackCanceled status. - */ - void cancel(); + Status start() override; + + void cancel() override; // // Testing only functions below. // - /** - * Waits for active remote commands and database worker to complete. - * Returns immediately if collection cloner is not active. - * - * TODO: Internal state not sufficiently protected for production use. - * - * For testing only. - */ - void wait(); + void wait() override; /** * Waits for database worker to complete. @@ -142,7 +115,7 @@ namespace repl { * * For testing only. */ - void setScheduleDbWorkFn(ScheduleDbWorkFn scheduleDbWorkFn); + void setScheduleDbWorkFn(const ScheduleDbWorkFn& scheduleDbWorkFn); private: @@ -193,7 +166,7 @@ namespace repl { // Owned by us. std::unique_ptr<StorageInterface> _storageInterface; - // Protects member data of this Fetcher. + // Protects member data of this collection cloner. mutable boost::mutex _mutex; // _active is true when Collection Cloner is started. @@ -232,7 +205,7 @@ namespace repl { /** * When the storage interface is destroyed, it will commit the index builder. */ - virtual ~StorageInterface(); + virtual ~StorageInterface() = default; /** * Creates a collection with the provided indexes. diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp index b00aeaa2b6d..8beac905dc5 100644 --- a/src/mongo/db/repl/collection_cloner_test.cpp +++ b/src/mongo/db/repl/collection_cloner_test.cpp @@ -33,9 +33,9 @@ #include "mongo/db/commands.h" #include "mongo/db/jsobj.h" +#include "mongo/db/repl/base_cloner_test_fixture.h" #include "mongo/db/repl/collection_cloner.h" #include "mongo/db/repl/network_interface_mock.h" -#include "mongo/db/repl/replication_executor_test_fixture.h" #include "mongo/unittest/unittest.h" namespace { @@ -43,187 +43,46 @@ namespace { using namespace mongo; using namespace mongo::repl; - typedef NetworkInterfaceMock::NetworkOperationIterator NetworkOperationIterator; - - const HostAndPort target("localhost", -1); - const NamespaceString nss("db.coll"); - const BSONObj idIndexSpec = BSON("v" << 1 << - "key" << BSON("_id" << 1) << - "name" << "_id_" << - "ns" << nss.ns()); - - /** - * Creates a cursor response with given array of documents. - */ - BSONObj createCursorResponse(CursorId cursorId, - const std::string& ns, - const BSONArray& docs, - const char* batchFieldName) { - return BSON("cursor" << BSON("id" << cursorId << - "ns" << ns << - batchFieldName << docs) << - "ok" << 1); - } - - BSONObj createCursorResponse(CursorId cursorId, - const BSONArray& docs, - const char* batchFieldName) { - return createCursorResponse(cursorId, nss.toString(), docs, batchFieldName); - } - - BSONObj createCursorResponse(CursorId cursorId, - const BSONArray& docs) { - return createCursorResponse(cursorId, docs, "firstBatch"); - } - - /** - * Creates a listIndexes response with given array of index specs. - */ - BSONObj createListIndexesResponse(CursorId cursorId, - const BSONArray& specs, - const char* batchFieldName) { - return createCursorResponse(cursorId, "test.$cmd.listIndexes.coll", specs, batchFieldName); - } - - BSONObj createListIndexesResponse(CursorId cursorId, const BSONArray& specs) { - return createListIndexesResponse(cursorId, specs, "firstBatch"); - } - - class StorageInterfaceMock : public CollectionCloner::StorageInterface { + class CollectionClonerTest : public BaseClonerTest { public: - Status beginCollection(OperationContext* txn, - const NamespaceString& nss, - const CollectionOptions& options, - const std::vector<BSONObj>& specs) override { - return beginCollectionFn ? beginCollectionFn(txn, nss, options, specs) : Status::OK(); - } - - Status insertDocuments(OperationContext* txn, - const NamespaceString& ns, - const std::vector<BSONObj>& docs) override { - return insertDocumentsFn ? insertDocumentsFn(txn, ns, docs) : Status::OK(); - } - - stdx::function<Status (OperationContext*, - const NamespaceString&, - const CollectionOptions&, - const std::vector<BSONObj>&)> beginCollectionFn; - - stdx::function<Status (OperationContext*, - const NamespaceString&, - const std::vector<BSONObj>&)> insertDocumentsFn; - }; - - class CollectionClonerTest : public ReplicationExecutorTest { - public: - static Status getDefaultStatus(); CollectionClonerTest(); + void setUp() override; void tearDown() override; - void clear(); - void scheduleNetworkResponse(NetworkOperationIterator noi, - const BSONObj& obj); - void scheduleNetworkResponse(NetworkOperationIterator noi, - ErrorCodes::Error code, const std::string& reason); - void scheduleNetworkResponse(const BSONObj& obj); - void scheduleNetworkResponse(ErrorCodes::Error code, const std::string& reason); - void processNetworkResponse(const BSONObj& obj); - void processNetworkResponse(ErrorCodes::Error code, const std::string& reason); - void finishProcessingNetworkResponse(); + BaseCloner* getCloner() const override; protected: CollectionOptions options; - Status status; StorageInterfaceMock* storageInterface; std::unique_ptr<CollectionCloner> collectionCloner; - - private: - void _callback(const Status& status); }; - Status CollectionClonerTest::getDefaultStatus() { - return Status(ErrorCodes::InternalError, "Not mutated"); - } - CollectionClonerTest::CollectionClonerTest() : options(), - status(getDefaultStatus()), + storageInterface(nullptr), collectionCloner() { } void CollectionClonerTest::setUp() { - ReplicationExecutorTest::setUp(); + BaseClonerTest::setUp(); options.reset(); options.storageEngine = BSON("storageEngine1" << BSONObj()); - clear(); storageInterface = new StorageInterfaceMock(); collectionCloner.reset(new CollectionCloner(&getExecutor(), target, nss, options, - stdx::bind(&CollectionClonerTest::_callback, + stdx::bind(&CollectionClonerTest::setStatus, this, stdx::placeholders::_1), storageInterface)); - launchExecutorThread(); } void CollectionClonerTest::tearDown() { - ReplicationExecutorTest::tearDown(); + BaseClonerTest::tearDown(); // Executor may still invoke collection cloner's callback before shutting down. collectionCloner.reset(); options.reset(); } - void CollectionClonerTest::clear() { - status = getDefaultStatus(); - } - - void CollectionClonerTest::scheduleNetworkResponse(NetworkOperationIterator noi, - const BSONObj& obj) { - - auto net = getNet(); - ReplicationExecutor::Milliseconds millis(0); - ReplicationExecutor::RemoteCommandResponse response(obj, millis); - ReplicationExecutor::ResponseStatus responseStatus(response); - net->scheduleResponse(noi, net->now(), responseStatus); - } - - void CollectionClonerTest::scheduleNetworkResponse(NetworkOperationIterator noi, - ErrorCodes::Error code, - const std::string& reason) { - - auto net = getNet(); - ReplicationExecutor::ResponseStatus responseStatus(code, reason); - net->scheduleResponse(noi, net->now(), responseStatus); - } - - void CollectionClonerTest::scheduleNetworkResponse(const BSONObj& obj) { - ASSERT_TRUE(getNet()->hasReadyRequests()); - scheduleNetworkResponse(getNet()->getNextReadyRequest(), obj); - } - - void CollectionClonerTest::scheduleNetworkResponse(ErrorCodes::Error code, - const std::string& reason) { - ASSERT_TRUE(getNet()->hasReadyRequests()); - scheduleNetworkResponse(getNet()->getNextReadyRequest(), code, reason); - } - - void CollectionClonerTest::processNetworkResponse(const BSONObj& obj) { - scheduleNetworkResponse(obj); - finishProcessingNetworkResponse(); - } - - void CollectionClonerTest::processNetworkResponse(ErrorCodes::Error code, - const std::string& reason) { - scheduleNetworkResponse(code, reason); - finishProcessingNetworkResponse(); - } - - void CollectionClonerTest::finishProcessingNetworkResponse() { - clear(); - ASSERT_TRUE(collectionCloner->isActive()); - getNet()->runReadyNetworkOperations(); - } - - void CollectionClonerTest::_callback(const Status& theStatus) { - status = theStatus; + BaseCloner* CollectionClonerTest::getCloner() const { + return collectionCloner.get(); } TEST_F(CollectionClonerTest, InvalidConstruction) { @@ -257,59 +116,18 @@ namespace { ASSERT_THROWS(CollectionCloner(&executor, target, nss, invalidOptions, cb, si), UserException); } - } - - TEST_F(CollectionClonerTest, GetDiagnosticString) { - ASSERT_FALSE(collectionCloner->getDiagnosticString().empty()); - } - - TEST_F(CollectionClonerTest, IsActiveAfterStart) { - ASSERT_FALSE(collectionCloner->isActive()); - ASSERT_OK(collectionCloner->start()); - ASSERT_TRUE(collectionCloner->isActive()); - } - - TEST_F(CollectionClonerTest, StartWhenActive) { - ASSERT_OK(collectionCloner->start()); - ASSERT_TRUE(collectionCloner->isActive()); - ASSERT_NOT_OK(collectionCloner->start()); - } - - TEST_F(CollectionClonerTest, CancelWithoutStart) { - ASSERT_FALSE(collectionCloner->isActive()); - collectionCloner->cancel(); - } - TEST_F(CollectionClonerTest, WaitWithoutStart) { - ASSERT_FALSE(collectionCloner->isActive()); - collectionCloner->wait(); - } - - TEST_F(CollectionClonerTest, ShutdownBeforeStart) { - getExecutor().shutdown(); - ASSERT_NOT_OK(collectionCloner->start()); - ASSERT_FALSE(collectionCloner->isActive()); - } - - TEST_F(CollectionClonerTest, StartAndCancel) { - ASSERT_OK(collectionCloner->start()); - scheduleNetworkResponse(BSON("ok" << 1)); - - collectionCloner->cancel(); - finishProcessingNetworkResponse(); - - ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status.code()); + // Callback function cannot be null. + { + CollectionCloner::CallbackFn nullCb; + CollectionCloner::StorageInterface* si = new StorageInterfaceMock(); + ASSERT_THROWS(CollectionCloner(&executor, target, nss, options, nullCb, si), + UserException); + } } - TEST_F(CollectionClonerTest, StartButShutdown) { - ASSERT_OK(collectionCloner->start()); - scheduleNetworkResponse(BSON("ok" << 1)); - - getExecutor().shutdown(); - // Network interface should not deliver mock response to callback. - finishProcessingNetworkResponse(); - - ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status.code()); + TEST_F(CollectionClonerTest, ClonerLifeCycle) { + testLifeCycle(); } TEST_F(CollectionClonerTest, FirstRemoteCommand) { @@ -332,7 +150,7 @@ namespace { processNetworkResponse( BSON("ok" << 0 << "errmsg" << "" << "code" << ErrorCodes::NamespaceNotFound)); - ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, status.code()); + ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, getStatus().code()); ASSERT_FALSE(collectionCloner->isActive()); } @@ -345,7 +163,7 @@ namespace { // the cloner stops the fetcher from retrieving more results. processNetworkResponse(createListIndexesResponse(1, BSONArray())); - ASSERT_EQUALS(getDefaultStatus(), status); + ASSERT_EQUALS(getDefaultStatus(), getStatus()); ASSERT_TRUE(collectionCloner->isActive()); ASSERT_TRUE(getNet()->hasReadyRequests()); @@ -362,7 +180,7 @@ namespace { processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); - ASSERT_EQUALS(ErrorCodes::UnknownError, status.code()); + ASSERT_EQUALS(ErrorCodes::UnknownError, getStatus().code()); ASSERT_FALSE(collectionCloner->isActive()); } @@ -384,7 +202,7 @@ namespace { processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(idIndexSpec))); collectionCloner->waitForDbWorker(); - ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status.code()); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, getStatus().code()); ASSERT_FALSE(collectionCloner->isActive()); } @@ -402,7 +220,7 @@ namespace { collectionCloner->waitForDbWorker(); - ASSERT_EQUALS(ErrorCodes::OperationFailed, status.code()); + ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus().code()); ASSERT_FALSE(collectionCloner->isActive()); } @@ -433,7 +251,7 @@ namespace { processNetworkResponse(createListIndexesResponse(1, BSON_ARRAY(specs[0] << specs[1]))); // 'status' should not be modified because cloning is not finished. - ASSERT_EQUALS(getDefaultStatus(), status); + ASSERT_EQUALS(getDefaultStatus(), getStatus()); ASSERT_TRUE(collectionCloner->isActive()); processNetworkResponse(createListIndexesResponse(0, BSON_ARRAY(specs[2]), "nextBatch")); @@ -441,7 +259,7 @@ namespace { collectionCloner->waitForDbWorker(); // 'status' will be set if listIndexes fails. - ASSERT_EQUALS(getDefaultStatus(), status); + ASSERT_EQUALS(getDefaultStatus(), getStatus()); ASSERT_EQUALS(nss.ns(), collNss.ns()); ASSERT_EQUALS(options.toBSON(), collOptions.toBSON()); @@ -474,7 +292,7 @@ namespace { collectionCloner->waitForDbWorker(); ASSERT_TRUE(collectionCreated); - ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status.code()); + ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, getStatus().code()); ASSERT_FALSE(collectionCloner->isActive()); } @@ -517,7 +335,7 @@ namespace { processNetworkResponse( BSON("ok" << 0 << "errmsg" << "" << "code" << ErrorCodes::CursorNotFound)); - ASSERT_EQUALS(ErrorCodes::CursorNotFound, status.code()); + ASSERT_EQUALS(ErrorCodes::CursorNotFound, getStatus().code()); ASSERT_FALSE(collectionCloner->isActive()); } @@ -537,7 +355,7 @@ namespace { net->runReadyNetworkOperations(); - ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status.code()); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, getStatus().code()); ASSERT_FALSE(collectionCloner->isActive()); } @@ -557,7 +375,7 @@ namespace { const BSONObj doc = BSON("_id" << 1); processNetworkResponse(createCursorResponse(0, BSON_ARRAY(doc))); - ASSERT_EQUALS(ErrorCodes::UnknownError, status.code()); + ASSERT_EQUALS(ErrorCodes::UnknownError, getStatus().code()); ASSERT_FALSE(collectionCloner->isActive()); } @@ -584,7 +402,7 @@ namespace { processNetworkResponse(createCursorResponse(0, BSON_ARRAY(doc))); collectionCloner->waitForDbWorker(); - ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status.code()); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, getStatus().code()); ASSERT_FALSE(collectionCloner->isActive()); } @@ -607,7 +425,7 @@ namespace { collectionCloner->wait(); - ASSERT_EQUALS(ErrorCodes::OperationFailed, status.code()); + ASSERT_EQUALS(ErrorCodes::OperationFailed, getStatus().code()); ASSERT_FALSE(collectionCloner->isActive()); } @@ -634,7 +452,7 @@ namespace { ASSERT_EQUALS(1U, collDocuments.size()); ASSERT_EQUALS(doc, collDocuments[0]); - ASSERT_OK(status); + ASSERT_OK(getStatus()); ASSERT_FALSE(collectionCloner->isActive()); } @@ -661,7 +479,7 @@ namespace { ASSERT_EQUALS(1U, collDocuments.size()); ASSERT_EQUALS(doc, collDocuments[0]); - ASSERT_EQUALS(getDefaultStatus(), status); + ASSERT_EQUALS(getDefaultStatus(), getStatus()); ASSERT_TRUE(collectionCloner->isActive()); const BSONObj doc2 = BSON("_id" << 1); @@ -671,7 +489,7 @@ namespace { ASSERT_EQUALS(1U, collDocuments.size()); ASSERT_EQUALS(doc2, collDocuments[0]); - ASSERT_OK(status); + ASSERT_OK(getStatus()); ASSERT_FALSE(collectionCloner->isActive()); } |