diff options
-rw-r--r-- | src/mongo/db/repl/base_cloner.h | 10 | ||||
-rw-r--r-- | src/mongo/db/repl/base_cloner_test_fixture.cpp | 13 | ||||
-rw-r--r-- | src/mongo/db/repl/base_cloner_test_fixture.h | 17 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner.cpp | 102 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner.h | 42 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner_test.cpp | 18 | ||||
-rw-r--r-- | src/mongo/db/repl/database_cloner.cpp | 100 | ||||
-rw-r--r-- | src/mongo/db/repl/database_cloner.h | 39 | ||||
-rw-r--r-- | src/mongo/db/repl/database_cloner_test.cpp | 35 | ||||
-rw-r--r-- | src/mongo/db/repl/database_task_test.cpp | 21 |
10 files changed, 202 insertions, 195 deletions
diff --git a/src/mongo/db/repl/base_cloner.h b/src/mongo/db/repl/base_cloner.h index 41272004758..8d6b8be8928 100644 --- a/src/mongo/db/repl/base_cloner.h +++ b/src/mongo/db/repl/base_cloner.h @@ -72,17 +72,9 @@ namespace repl { */ virtual void cancel() = 0; - // - // Testing only functions below. - // - /** * Waits for active remote commands and database worker to complete. - * Returns immediately if collection cloner is not active. - * - * TODO: Internal state not sufficiently protected for production use. - * - * For testing only. + * Returns immediately if cloner is not active. */ virtual void wait() = 0; diff --git a/src/mongo/db/repl/base_cloner_test_fixture.cpp b/src/mongo/db/repl/base_cloner_test_fixture.cpp index 75bc0b7b886..92518498300 100644 --- a/src/mongo/db/repl/base_cloner_test_fixture.cpp +++ b/src/mongo/db/repl/base_cloner_test_fixture.cpp @@ -106,10 +106,12 @@ namespace repl { ReplicationExecutorTest::setUp(); clear(); launchExecutorThread(); + storageInterface.reset(new StorageInterfaceMock()); } void BaseClonerTest::tearDown() { ReplicationExecutorTest::tearDown(); + storageInterface.reset(); } void BaseClonerTest::clear() { @@ -117,18 +119,18 @@ namespace repl { } void BaseClonerTest::setStatus(const Status& status) { - boost::unique_lock<boost::mutex> lk(_mutex); + stdx::unique_lock<stdx::mutex> lk(_mutex); _status = status; _setStatusCondition.notify_all(); } const Status& BaseClonerTest::getStatus() const { - boost::unique_lock<boost::mutex> lk(_mutex); + stdx::unique_lock<stdx::mutex> lk(_mutex); return _status; } void BaseClonerTest::waitForStatus() { - boost::unique_lock<boost::mutex> lk(_mutex); + stdx::unique_lock<stdx::mutex> lk(_mutex); if (_status == getDetectableErrorStatus()) { try { _setStatusCondition.wait_for(lk, Milliseconds(1000)); @@ -256,5 +258,10 @@ namespace repl { return insertDocumentsFn ? insertDocumentsFn(txn, nss, docs) : Status::OK(); } + Status StorageInterfaceMock::commitCollection(OperationContext* txn, + const NamespaceString& nss) { + return Status::OK(); + } + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/base_cloner_test_fixture.h b/src/mongo/db/repl/base_cloner_test_fixture.h index 2bd93ce436b..064003bbb6d 100644 --- a/src/mongo/db/repl/base_cloner_test_fixture.h +++ b/src/mongo/db/repl/base_cloner_test_fixture.h @@ -28,8 +28,7 @@ #pragma once -#include <boost/thread/mutex.hpp> -#include <boost/thread/condition_variable.hpp> +#include <memory> #include <vector> #include "mongo/base/status.h" @@ -39,6 +38,8 @@ #include "mongo/db/repl/collection_cloner.h" #include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/replication_executor_test_fixture.h" +#include "mongo/stdx/mutex.h" +#include "mongo/stdx/condition_variable.h" #include "mongo/util/net/hostandport.h" namespace mongo { @@ -49,6 +50,7 @@ namespace mongo { namespace repl { class BaseCloner; + class StorageInterfaceMock; class BaseClonerTest : public ReplicationExecutorTest { public: @@ -124,12 +126,16 @@ namespace repl { virtual BaseCloner* getCloner() const = 0; void testLifeCycle(); + protected: + + std::unique_ptr<StorageInterfaceMock> storageInterface; + private: // Protects member data of this base cloner fixture. - mutable boost::mutex _mutex; + mutable stdx::mutex _mutex; - boost::condition_variable _setStatusCondition; + stdx::condition_variable _setStatusCondition; Status _status; @@ -146,6 +152,9 @@ namespace repl { const NamespaceString& nss, const std::vector<BSONObj>& docs) override; + Status commitCollection(OperationContext* txn, + const NamespaceString& nss) override; + stdx::function<Status (OperationContext*, const NamespaceString&, const CollectionOptions&, diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp index 18d8dab6cbf..7d58ae3f2e8 100644 --- a/src/mongo/db/repl/collection_cloner.cpp +++ b/src/mongo/db/repl/collection_cloner.cpp @@ -32,8 +32,6 @@ #include "mongo/db/repl/collection_cloner.h" -#include <boost/thread/lock_guard.hpp> - #include "mongo/util/assert_util.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" @@ -45,14 +43,14 @@ namespace repl { const HostAndPort& source, const NamespaceString& sourceNss, const CollectionOptions& options, - const CallbackFn& work, + const CallbackFn& onCompletion, StorageInterface* storageInterface) : _executor(executor), _source(source), _sourceNss(sourceNss), _destNss(_sourceNss), _options(options), - _work(work), + _onCompletion(onCompletion), _storageInterface(storageInterface), _active(false), _listIndexesFetcher(_executor, @@ -77,25 +75,31 @@ namespace repl { _indexSpecs(), _documents(), _dbWorkCallbackHandle(), - // TODO: replace with executor database worker when it is available. - _scheduleDbWorkFn(stdx::bind(&ReplicationExecutor::scheduleWorkWithGlobalExclusiveLock, - _executor, - stdx::placeholders::_1)) { + _scheduleDbWorkFn([this](const ReplicationExecutor::CallbackFn& work) { + return _executor->scheduleDBWork(work); + }) { uassert(ErrorCodes::BadValue, "null replication executor", executor); uassert(ErrorCodes::BadValue, "invalid collection namespace: " + sourceNss.ns(), sourceNss.isValid()); uassertStatusOK(options.validate()); - uassert(ErrorCodes::BadValue, "callback function cannot be null", work); + uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion); uassert(ErrorCodes::BadValue, "null storage interface", storageInterface); } + CollectionCloner::~CollectionCloner() { + DESTRUCTOR_GUARD( + cancel(); + wait(); + ); + } + const NamespaceString& CollectionCloner::getSourceNamespace() const { return _sourceNss; } std::string CollectionCloner::getDiagnosticString() const { - boost::lock_guard<boost::mutex> lk(_mutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); str::stream output; output << "CollectionCloner"; output << " executor: " << _executor->getDiagnosticString(); @@ -112,12 +116,12 @@ namespace repl { } bool CollectionCloner::isActive() const { - boost::lock_guard<boost::mutex> lk(_mutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); return _active; } Status CollectionCloner::start() { - boost::lock_guard<boost::mutex> lk(_mutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); if (_active) { return Status(ErrorCodes::IllegalOperation, "collection cloner already started"); @@ -136,7 +140,7 @@ namespace repl { void CollectionCloner::cancel() { ReplicationExecutor::CallbackHandle dbWorkCallbackHandle; { - boost::lock_guard<boost::mutex> lk(_mutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); if (!_active) { return; @@ -154,16 +158,14 @@ namespace repl { } void CollectionCloner::wait() { - // If a fetcher is inactive, wait() has no effect. - _listIndexesFetcher.wait(); - _findFetcher.wait(); - waitForDbWorker(); + stdx::unique_lock<stdx::mutex> lk(_mutex); + _condition.wait(lk, [this]() { return !_active; }); } void CollectionCloner::waitForDbWorker() { ReplicationExecutor::CallbackHandle dbWorkCallbackHandle; { - boost::lock_guard<boost::mutex> lk(_mutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); if (!_active) { return; @@ -178,7 +180,7 @@ namespace repl { } void CollectionCloner::setScheduleDbWorkFn(const ScheduleDbWorkFn& scheduleDbWorkFn) { - boost::lock_guard<boost::mutex> lk(_mutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); _scheduleDbWorkFn = scheduleDbWorkFn; } @@ -186,12 +188,8 @@ namespace repl { void CollectionCloner::_listIndexesCallback(const StatusWith<Fetcher::BatchData>& fetchResult, Fetcher::NextAction* nextAction, BSONObjBuilder* getMoreBob) { - boost::lock_guard<boost::mutex> lk(_mutex); - - _active = false; - if (!fetchResult.isOK()) { - _work(fetchResult.getStatus()); + _finishCallback(nullptr, fetchResult.getStatus()); return; } @@ -212,8 +210,6 @@ namespace repl { invariant(getMoreBob); getMoreBob->append("getMore", batchData.cursorId); getMoreBob->append("collection", batchData.nss.coll()); - - _active = true; return; } @@ -221,23 +217,18 @@ namespace repl { auto&& scheduleResult = _scheduleDbWorkFn( stdx::bind(&CollectionCloner::_beginCollectionCallback, this, stdx::placeholders::_1)); if (!scheduleResult.isOK()) { - _work(scheduleResult.getStatus()); + _finishCallback(nullptr, scheduleResult.getStatus()); return; } - _active = true; _dbWorkCallbackHandle = scheduleResult.getValue(); } void CollectionCloner::_findCallback(const StatusWith<Fetcher::BatchData>& fetchResult, Fetcher::NextAction* nextAction, BSONObjBuilder* getMoreBob) { - boost::lock_guard<boost::mutex> lk(_mutex); - - _active = false; - if (!fetchResult.isOK()) { - _work(fetchResult.getStatus()); + _finishCallback(nullptr, fetchResult.getStatus()); return; } @@ -248,7 +239,7 @@ namespace repl { auto&& scheduleResult = _scheduleDbWorkFn(stdx::bind( &CollectionCloner::_insertDocumentsCallback, this, stdx::placeholders::_1, lastBatch)); if (!scheduleResult.isOK()) { - _work(scheduleResult.getStatus()); + _finishCallback(nullptr, scheduleResult.getStatus()); return; } @@ -258,59 +249,62 @@ namespace repl { getMoreBob->append("collection", batchData.nss.coll()); } - _active = true; _dbWorkCallbackHandle = scheduleResult.getValue(); } void CollectionCloner::_beginCollectionCallback(const ReplicationExecutor::CallbackData& cbd) { - boost::lock_guard<boost::mutex> lk(_mutex); - - _active = false; - + OperationContext* txn = cbd.txn; if (!cbd.status.isOK()) { - _work(cbd.status); + _finishCallback(txn, cbd.status); return; } - OperationContext* txn = cbd.txn; Status status = _storageInterface->beginCollection(txn, _destNss, _options, _indexSpecs); if (!status.isOK()) { - _work(status); + _finishCallback(txn, status); return; } Status scheduleStatus = _findFetcher.schedule(); if (!scheduleStatus.isOK()) { - _work(scheduleStatus); + _finishCallback(txn, scheduleStatus); return; } - - _active = true; } void CollectionCloner::_insertDocumentsCallback(const ReplicationExecutor::CallbackData& cbd, bool lastBatch) { - boost::lock_guard<boost::mutex> lk(_mutex); - - _active = false; - + OperationContext* txn = cbd.txn; if (!cbd.status.isOK()) { - _work(cbd.status); + _finishCallback(txn, cbd.status); return; } - Status status = _storageInterface->insertDocuments(cbd.txn, _destNss, _documents); + Status status = _storageInterface->insertDocuments(txn, _destNss, _documents); if (!status.isOK()) { - _work(status); + _finishCallback(txn, status); return; } if (!lastBatch) { - _active = true; return; } - _work(Status::OK()); + _finishCallback(txn, Status::OK()); + } + + void CollectionCloner::_finishCallback(OperationContext* txn, const Status& status) { + if (status.isOK()) { + auto status = _storageInterface->commitCollection(txn, _destNss); + if (!status.isOK()) { + warning() << "Failed to commit changes to collection " << _destNss.ns() + << ": " << status; + } + } + _onCompletion(status); + stdx::lock_guard<stdx::mutex> lk(_mutex); + _active = false; + _condition.notify_all(); } } // namespace repl diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h index e31d8d1f075..f4f9d9867ec 100644 --- a/src/mongo/db/repl/collection_cloner.h +++ b/src/mongo/db/repl/collection_cloner.h @@ -28,7 +28,6 @@ #pragma once -#include <boost/thread/mutex.hpp> #include <memory> #include <string> #include <vector> @@ -42,6 +41,8 @@ #include "mongo/db/repl/fetcher.h" #include "mongo/db/repl/replication_executor.h" #include "mongo/stdx/functional.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/mutex.h" #include "mongo/util/net/hostandport.h" namespace mongo { @@ -71,9 +72,9 @@ namespace repl { /** * Creates CollectionCloner task in inactive state. Use start() to activate cloner. * - * The cloner calls 'work' when the collection cloning has completed or failed. + * The cloner calls 'onCompletion' when the collection cloning has completed or failed. * - * 'work' will be called exactly once. + * 'onCompletion' will be called exactly once. * * Takes ownership of the passed StorageInterface object. */ @@ -81,10 +82,10 @@ namespace repl { const HostAndPort& source, const NamespaceString& sourceNss, const CollectionOptions& options, - const CallbackFn& work, + const CallbackFn& onCompletion, StorageInterface* storageInterface); - virtual ~CollectionCloner() = default; + virtual ~CollectionCloner(); const NamespaceString& getSourceNamespace() const; @@ -96,12 +97,12 @@ namespace repl { void cancel() override; + void wait() override; + // // Testing only functions below. // - void wait() override; - /** * Waits for database worker to complete. * Returns immediately if collection cloner is not active. @@ -154,6 +155,13 @@ namespace repl { void _insertDocumentsCallback(const ReplicationExecutor::CallbackData& callbackData, bool lastBatch); + /** + * Reports completion status. + * Commits/aborts collection building. + * Sets cloner to inactive. + */ + void _finishCallback(OperationContext* txn, const Status& status); + // Not owned by us. ReplicationExecutor* _executor; @@ -163,13 +171,15 @@ namespace repl { CollectionOptions _options; // Invoked once when cloning completes or fails. - CallbackFn _work; + CallbackFn _onCompletion; - // Owned by us. - std::unique_ptr<StorageInterface> _storageInterface; + // Not owned by us. + StorageInterface* _storageInterface; // Protects member data of this collection cloner. - mutable boost::mutex _mutex; + mutable stdx::mutex _mutex; + + mutable stdx::condition_variable _condition; // _active is true when Collection Cloner is started. bool _active; @@ -204,9 +214,6 @@ namespace repl { class CollectionCloner::StorageInterface { public: - /** - * When the storage interface is destroyed, it will commit the index builder. - */ virtual ~StorageInterface() = default; /** @@ -230,6 +237,13 @@ namespace repl { const NamespaceString& nss, const std::vector<BSONObj>& documents) = 0; + /** + * Commits changes to collection. No effect if collection building has not begun. + * Operation context could be null. + */ + virtual Status commitCollection(OperationContext* txn, + const NamespaceString& nss) = 0; + }; } // namespace repl diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp index 58b23ab226d..a6841ad2467 100644 --- a/src/mongo/db/repl/collection_cloner_test.cpp +++ b/src/mongo/db/repl/collection_cloner_test.cpp @@ -45,7 +45,6 @@ namespace { class CollectionClonerTest : public BaseClonerTest { public: - CollectionClonerTest(); void setUp() override; void tearDown() override; @@ -53,25 +52,18 @@ namespace { protected: CollectionOptions options; - StorageInterfaceMock* storageInterface; std::unique_ptr<CollectionCloner> collectionCloner; }; - CollectionClonerTest::CollectionClonerTest() - : options(), - storageInterface(nullptr), - collectionCloner() { } - void CollectionClonerTest::setUp() { BaseClonerTest::setUp(); options.reset(); options.storageEngine = BSON("storageEngine1" << BSONObj()); - storageInterface = new StorageInterfaceMock(); collectionCloner.reset(new CollectionCloner(&getExecutor(), target, nss, options, stdx::bind(&CollectionClonerTest::setStatus, this, stdx::placeholders::_1), - storageInterface)); + storageInterface.get())); } void CollectionClonerTest::tearDown() { @@ -92,7 +84,7 @@ namespace { // Null executor. { - CollectionCloner::StorageInterface* si = new StorageInterfaceMock(); + CollectionCloner::StorageInterface* si = storageInterface.get(); ASSERT_THROWS(CollectionCloner(nullptr, target, nss, options, cb, si), UserException); } @@ -103,7 +95,7 @@ namespace { // Invalid namespace. { NamespaceString badNss("db."); - CollectionCloner::StorageInterface* si = new StorageInterfaceMock(); + CollectionCloner::StorageInterface* si = storageInterface.get(); ASSERT_THROWS(CollectionCloner(&executor, target, badNss, options, cb, si), UserException); } @@ -112,7 +104,7 @@ namespace { { CollectionOptions invalidOptions; invalidOptions.storageEngine = BSON("storageEngine1" << "not a document"); - CollectionCloner::StorageInterface* si = new StorageInterfaceMock(); + CollectionCloner::StorageInterface* si = storageInterface.get(); ASSERT_THROWS(CollectionCloner(&executor, target, nss, invalidOptions, cb, si), UserException); } @@ -120,7 +112,7 @@ namespace { // Callback function cannot be null. { CollectionCloner::CallbackFn nullCb; - CollectionCloner::StorageInterface* si = new StorageInterfaceMock(); + CollectionCloner::StorageInterface* si = storageInterface.get(); ASSERT_THROWS(CollectionCloner(&executor, target, nss, options, nullCb, si), UserException); } diff --git a/src/mongo/db/repl/database_cloner.cpp b/src/mongo/db/repl/database_cloner.cpp index 466944c02f9..dc0f6f8990a 100644 --- a/src/mongo/db/repl/database_cloner.cpp +++ b/src/mongo/db/repl/database_cloner.cpp @@ -32,7 +32,6 @@ #include "mongo/db/repl/database_cloner.h" -#include <boost/thread/lock_guard.hpp> #include <algorithm> #include <iterator> #include <set> @@ -77,17 +76,17 @@ namespace { const std::string& dbname, const BSONObj& listCollectionsFilter, const ListCollectionsPredicateFn& listCollectionsPred, - const CreateStorageInterfaceFn& csi, + CollectionCloner::StorageInterface* si, const CollectionCallbackFn& collWork, - const CallbackFn& work) + const CallbackFn& onCompletion) : _executor(executor), _source(source), _dbname(dbname), _listCollectionsFilter(listCollectionsFilter), _listCollectionsPredicate(listCollectionsPred ? listCollectionsPred : acceptAllPred), - _createStorageInterface(csi), + _storageInterface(si), _collectionWork(collWork), - _work(work), + _onCompletion(onCompletion), _active(false), _listCollectionsFetcher(_executor, _source, @@ -98,26 +97,32 @@ namespace { stdx::placeholders::_1, stdx::placeholders::_2, stdx::placeholders::_3)), - // TODO: replace with executor database worker when it is available. - _scheduleDbWorkFn(stdx::bind(&ReplicationExecutor::scheduleWorkWithGlobalExclusiveLock, - _executor, - stdx::placeholders::_1)), + _scheduleDbWorkFn([this](const ReplicationExecutor::CallbackFn& work) { + return _executor->scheduleDBWork(work); + }), _startCollectionCloner([](CollectionCloner& cloner) { return cloner.start(); }) { uassert(ErrorCodes::BadValue, "null replication executor", executor); uassert(ErrorCodes::BadValue, "empty database name", !dbname.empty()); - uassert(ErrorCodes::BadValue, "storage interface creation function cannot be null", csi); + uassert(ErrorCodes::BadValue, "storage interface cannot be null", si); uassert(ErrorCodes::BadValue, "collection callback function cannot be null", collWork); - uassert(ErrorCodes::BadValue, "callback function cannot be null", work); + uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion); + } + + DatabaseCloner::~DatabaseCloner() { + DESTRUCTOR_GUARD( + cancel(); + wait(); + ); } const std::vector<BSONObj>& DatabaseCloner::getCollectionInfos() const { - boost::lock_guard<boost::mutex> lk(_mutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); return _collectionInfos; } std::string DatabaseCloner::getDiagnosticString() const { - boost::lock_guard<boost::mutex> lk(_mutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); str::stream output; output << "DatabaseCloner"; output << " executor: " << _executor->getDiagnosticString(); @@ -131,12 +136,12 @@ namespace { } bool DatabaseCloner::isActive() const { - boost::lock_guard<boost::mutex> lk(_mutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); return _active; } Status DatabaseCloner::start() { - boost::lock_guard<boost::mutex> lk(_mutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); if (_active) { return Status(ErrorCodes::IllegalOperation, "database cloner already started"); @@ -154,7 +159,7 @@ namespace { void DatabaseCloner::cancel() { { - boost::lock_guard<boost::mutex> lk(_mutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); if (!_active) { return; @@ -165,10 +170,12 @@ namespace { } void DatabaseCloner::wait() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + _condition.wait(lk, [this]() { return !_active; }); } void DatabaseCloner::setScheduleDbWorkFn(const CollectionCloner::ScheduleDbWorkFn& work) { - boost::lock_guard<boost::mutex> lk(_mutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); _scheduleDbWorkFn = work; } @@ -183,12 +190,8 @@ namespace { Fetcher::NextAction* nextAction, BSONObjBuilder* getMoreBob) { - boost::lock_guard<boost::mutex> lk(_mutex); - - _active = false; - if (!result.isOK()) { - _work(result.getStatus()); + _finishCallback(result.getStatus()); return; } @@ -206,14 +209,12 @@ namespace { invariant(getMoreBob); getMoreBob->append("getMore", batchData.cursorId); getMoreBob->append("collection", batchData.nss.coll()); - - _active = true; return; } // Nothing to do for an empty database. if (_collectionInfos.empty()) { - _work(Status::OK()); + _finishCallback(Status::OK()); return; } @@ -222,41 +223,42 @@ namespace { for (auto&& info : _collectionInfos) { BSONElement nameElement = info.getField(kNameFieldName); if (nameElement.eoo()) { - _work(Status(ErrorCodes::FailedToParse, str::stream() << + _finishCallback(Status(ErrorCodes::FailedToParse, str::stream() << "collection info must contain '" << kNameFieldName << "' " << "field : " << info)); return; } if (nameElement.type() != mongo::String) { - _work(Status(ErrorCodes::TypeMismatch, str::stream() << - "'" << kNameFieldName << "' field must be a string: " << info)); + _finishCallback(Status(ErrorCodes::TypeMismatch, str::stream() << + "'" << kNameFieldName << "' field must be a string: " << info)); return; } const std::string collectionName = nameElement.String(); if (seen.find(collectionName) != seen.end()) { - _work(Status(ErrorCodes::DuplicateKey, str::stream() << - "collection info contains duplicate collection name " << - "'" << collectionName << "': " << info)); + _finishCallback(Status(ErrorCodes::DuplicateKey, str::stream() << + "collection info contains duplicate collection name " << + "'" << collectionName << "': " << info)); return; } BSONElement optionsElement = info.getField(kOptionsFieldName); if (optionsElement.eoo()) { - _work(Status(ErrorCodes::FailedToParse, str::stream() << - "collection info must contain '" << kOptionsFieldName << "' " << - "field : " << info)); + _finishCallback(Status(ErrorCodes::FailedToParse, str::stream() << + "collection info must contain '" << kOptionsFieldName << "' " << + "field : " << info)); return; } if (!optionsElement.isABSONObj()) { - _work(Status(ErrorCodes::TypeMismatch, str::stream() << - "'" << kOptionsFieldName << "' field must be an object: " << info)); + _finishCallback(Status(ErrorCodes::TypeMismatch, str::stream() << + "'" << kOptionsFieldName << "' field must be an object: " << + info)); return; } const BSONObj optionsObj = optionsElement.Obj(); CollectionOptions options; Status parseStatus = options.parse(optionsObj); if (!parseStatus.isOK()) { - _work(parseStatus); + _finishCallback(parseStatus); return; } seen.insert(collectionName); @@ -274,10 +276,10 @@ namespace { this, stdx::placeholders::_1, nss), - _createStorageInterface()); + _storageInterface); } catch (const UserException& ex) { - _work(ex.toStatus()); + _finishCallback(ex.toStatus()); return; } } @@ -296,19 +298,13 @@ namespace { LOG(1) << " failed to start collection cloning on " << _currentCollectionClonerIter->getSourceNamespace() << ": " << startStatus; - _work(startStatus); + _finishCallback(startStatus); return; } - - _active = true; } void DatabaseCloner::_collectionClonerCallback(const Status& status, const NamespaceString& nss) { - boost::lock_guard<boost::mutex> lk(_mutex); - - _active = false; - // Forward collection cloner result to caller. // Failure to clone a collection does not stop the database cloner // from cloning the rest of the collections in the listCollections result. @@ -324,14 +320,20 @@ namespace { LOG(1) << " failed to start collection cloning on " << _currentCollectionClonerIter->getSourceNamespace() << ": " << startStatus; - _work(startStatus); + _finishCallback(startStatus); return; } - _active = true; return; } - _work(Status::OK()); + _finishCallback(Status::OK()); + } + + void DatabaseCloner::_finishCallback(const Status& status) { + _onCompletion(status); + stdx::lock_guard<stdx::mutex> lk(_mutex); + _active = false; + _condition.notify_all(); } } // namespace repl diff --git a/src/mongo/db/repl/database_cloner.h b/src/mongo/db/repl/database_cloner.h index 7660f8a1995..447d7ce07fd 100644 --- a/src/mongo/db/repl/database_cloner.h +++ b/src/mongo/db/repl/database_cloner.h @@ -28,7 +28,6 @@ #pragma once -#include <boost/thread/mutex.hpp> #include <list> #include <string> #include <vector> @@ -41,6 +40,8 @@ #include "mongo/db/repl/base_cloner.h" #include "mongo/db/repl/fetcher.h" #include "mongo/db/repl/replication_executor.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/mutex.h" #include "mongo/util/net/hostandport.h" namespace mongo { @@ -64,14 +65,8 @@ namespace repl { using ListCollectionsPredicateFn = stdx::function<bool (const BSONObj&)>; /** - * Type of function to create a storage interface instance for - * the collection cloner. - */ - using CreateStorageInterfaceFn = stdx::function<CollectionCloner::StorageInterface* ()>; - - /** * Callback function to report progress of collection cloning. Arguments are: - * - status from the collection cloner's 'work' callback. + * - status from the collection cloner's 'onCompletion' callback. * - source namespace of the collection cloner that completed (or failed). * * Called exactly once for every collection cloner started by the the database cloner. @@ -86,9 +81,9 @@ namespace repl { /** * Creates DatabaseCloner task in inactive state. Use start() to activate cloner. * - * The cloner calls 'work' when the database cloning has completed or failed. + * The cloner calls 'onCompletion' when the database cloning has completed or failed. * - * 'work' will be called exactly once. + * 'onCompletion' will be called exactly once. * * Takes ownership of the passed StorageInterface object. */ @@ -97,11 +92,11 @@ namespace repl { const std::string& dbname, const BSONObj& listCollectionsFilter, const ListCollectionsPredicateFn& listCollectionsPredicate, - const CreateStorageInterfaceFn& createStorageInterface, + CollectionCloner::StorageInterface* storageInterface, const CollectionCallbackFn& collectionWork, - const CallbackFn& work); + const CallbackFn& onCompletion); - virtual ~DatabaseCloner() = default; + virtual ~DatabaseCloner(); /** * Returns collection info objects read from listCollections result. @@ -118,12 +113,12 @@ namespace repl { void cancel() override; + void wait() override; + // // Testing only functions below. // - void wait() override; - /** * Overrides how executor schedules database work. * @@ -153,6 +148,12 @@ namespace repl { */ void _collectionClonerCallback(const Status& status, const NamespaceString& nss); + /** + * Reports completion status. + * Sets cloner to inactive. + */ + void _finishCallback(const Status& status); + // Not owned by us. ReplicationExecutor* _executor; @@ -160,16 +161,18 @@ namespace repl { std::string _dbname; BSONObj _listCollectionsFilter; ListCollectionsPredicateFn _listCollectionsPredicate; - CreateStorageInterfaceFn _createStorageInterface; + CollectionCloner::StorageInterface* _storageInterface; // Invoked once for every successfully started collection cloner. CollectionCallbackFn _collectionWork; // Invoked once when cloning completes or fails. - CallbackFn _work; + CallbackFn _onCompletion; // Protects member data of this database cloner. - mutable boost::mutex _mutex; + mutable stdx::mutex _mutex; + + mutable stdx::condition_variable _condition; // _active is true when database cloner is started. bool _active; diff --git a/src/mongo/db/repl/database_cloner_test.cpp b/src/mongo/db/repl/database_cloner_test.cpp index ef1165bf6f1..22e0e452f3d 100644 --- a/src/mongo/db/repl/database_cloner_test.cpp +++ b/src/mongo/db/repl/database_cloner_test.cpp @@ -45,10 +45,6 @@ namespace { const std::string dbname("db"); - CollectionCloner::StorageInterface* createStorageInterface() { - return new StorageInterfaceMock(); - } - class DatabaseClonerTest : public BaseClonerTest { public: DatabaseClonerTest(); @@ -79,7 +75,7 @@ namespace { dbname, BSONObj(), DatabaseCloner::ListCollectionsPredicateFn(), - createStorageInterface, + storageInterface.get(), stdx::bind(&DatabaseClonerTest::collectionWork, this, stdx::placeholders::_1, @@ -107,7 +103,7 @@ namespace { const BSONObj filter; DatabaseCloner::ListCollectionsPredicateFn pred; - const DatabaseCloner::CreateStorageInterfaceFn& csi = createStorageInterface; + CollectionCloner::StorageInterface* si = storageInterface.get(); namespace stdxph = stdx::placeholders; const DatabaseCloner::CollectionCallbackFn ccb = stdx::bind(&DatabaseClonerTest::collectionWork, this, stdxph::_1, stdxph::_2); @@ -115,31 +111,31 @@ namespace { const auto& cb = [](const Status&) { FAIL("should not reach here"); }; // Null executor. - ASSERT_THROWS(DatabaseCloner(nullptr, target, dbname, filter, pred, csi, ccb, cb), + ASSERT_THROWS(DatabaseCloner(nullptr, target, dbname, filter, pred, si, ccb, cb), UserException); // Empty database name - ASSERT_THROWS(DatabaseCloner(&executor, target, "", filter, pred, csi, ccb, cb), + ASSERT_THROWS(DatabaseCloner(&executor, target, "", filter, pred, si, ccb, cb), UserException); // Callback function cannot be null. { DatabaseCloner::CallbackFn ncb; - ASSERT_THROWS(DatabaseCloner(&executor, target, dbname, filter, pred, csi, ccb, ncb), + ASSERT_THROWS(DatabaseCloner(&executor, target, dbname, filter, pred, si, ccb, ncb), UserException); } - // CreateStorageInterfaceFn function cannot be null. + // Storage interface cannot be null. { - DatabaseCloner::CreateStorageInterfaceFn ncsi; - ASSERT_THROWS(DatabaseCloner(&executor, target, dbname, filter, pred, ncsi, ccb, cb), + CollectionCloner::StorageInterface* nsi = nullptr; + ASSERT_THROWS(DatabaseCloner(&executor, target, dbname, filter, pred, nsi, ccb, cb), UserException); } // CollectionCallbackFn function cannot be null. { DatabaseCloner::CollectionCallbackFn nccb; - ASSERT_THROWS(DatabaseCloner(&executor, target, dbname, filter, pred, csi, nccb, cb), + ASSERT_THROWS(DatabaseCloner(&executor, target, dbname, filter, pred, si, nccb, cb), UserException); } } @@ -170,7 +166,7 @@ namespace { dbname, listCollectionsFilter, DatabaseCloner::ListCollectionsPredicateFn(), - createStorageInterface, + storageInterface.get(), stdx::bind(&DatabaseClonerTest::collectionWork, this, stdx::placeholders::_1, @@ -230,7 +226,7 @@ namespace { dbname, BSONObj(), pred, - createStorageInterface, + storageInterface.get(), stdx::bind(&DatabaseClonerTest::collectionWork, this, stdx::placeholders::_1, @@ -352,14 +348,13 @@ namespace { ASSERT_FALSE(databaseCloner->isActive()); } - TEST_F(DatabaseClonerTest, InvalidStorageInterface) { - auto invalidCreateStorageInterface = []() { return nullptr; }; + TEST_F(DatabaseClonerTest, ListCollectionsReturnsEmptyCollectionName) { databaseCloner.reset(new DatabaseCloner(&getExecutor(), target, dbname, BSONObj(), DatabaseCloner::ListCollectionsPredicateFn(), - invalidCreateStorageInterface, + storageInterface.get(), stdx::bind(&DatabaseClonerTest::collectionWork, this, stdx::placeholders::_1, @@ -370,10 +365,10 @@ namespace { ASSERT_OK(databaseCloner->start()); processNetworkResponse(createListCollectionsResponse(0, BSON_ARRAY( - BSON("name" << "a" << "options" << BSONObj())))); + BSON("name" << "" << "options" << BSONObj())))); ASSERT_EQUALS(ErrorCodes::BadValue, getStatus().code()); - ASSERT_STRING_CONTAINS(getStatus().reason(), "null storage interface"); + ASSERT_STRING_CONTAINS(getStatus().reason(), "invalid collection namespace: db."); ASSERT_FALSE(databaseCloner->isActive()); } diff --git a/src/mongo/db/repl/database_task_test.cpp b/src/mongo/db/repl/database_task_test.cpp index dd8c17ef3e6..13cf1f79e71 100644 --- a/src/mongo/db/repl/database_task_test.cpp +++ b/src/mongo/db/repl/database_task_test.cpp @@ -28,12 +28,11 @@ #include "mongo/platform/basic.h" -#include <boost/thread/lock_types.hpp> - #include "mongo/db/repl/database_task.h" #include "mongo/db/repl/operation_context_repl_mock.h" #include "mongo/db/repl/task_runner.h" #include "mongo/db/repl/task_runner_test_fixture.h" +#include "mongo/stdx/mutex.h" #include "mongo/util/concurrency/thread_pool.h" namespace { @@ -71,14 +70,14 @@ namespace { } TEST_F(DatabaseTaskTest, RunGlobalExclusiveLockTask) { - boost::mutex mutex; + stdx::mutex mutex; bool called = false; OperationContext* txn = nullptr; bool lockIsW = false; Status status = getDetectableErrorStatus(); // Task returning 'void' implies NextAction::NoAction. auto task = [&](OperationContext* theTxn, const Status& theStatus) { - boost::lock_guard<boost::mutex> lk(mutex); + stdx::lock_guard<stdx::mutex> lk(mutex); called = true; txn = theTxn; lockIsW = txn->lockState()->isW(); @@ -89,7 +88,7 @@ namespace { getThreadPool().join(); ASSERT_FALSE(getTaskRunner().isActive()); - boost::lock_guard<boost::mutex> lk(mutex); + stdx::lock_guard<stdx::mutex> lk(mutex); ASSERT_TRUE(called); ASSERT(txn); ASSERT_TRUE(lockIsW); @@ -97,14 +96,14 @@ namespace { } void _testRunDatabaseLockTask(DatabaseTaskTest& test, LockMode mode) { - boost::mutex mutex; + stdx::mutex mutex; bool called = false; OperationContext* txn = nullptr; bool isDatabaseLockedForMode = false; Status status = test.getDetectableErrorStatus(); // Task returning 'void' implies NextAction::NoAction. auto task = [&](OperationContext* theTxn, const Status& theStatus) { - boost::lock_guard<boost::mutex> lk(mutex); + stdx::lock_guard<stdx::mutex> lk(mutex); called = true; txn = theTxn; isDatabaseLockedForMode = txn->lockState()->isDbLockedForMode(databaseName, mode); @@ -116,7 +115,7 @@ namespace { test.getThreadPool().join(); ASSERT_FALSE(test.getTaskRunner().isActive()); - boost::lock_guard<boost::mutex> lk(mutex); + stdx::lock_guard<stdx::mutex> lk(mutex); ASSERT_TRUE(called); ASSERT(txn); ASSERT_TRUE(isDatabaseLockedForMode); @@ -140,14 +139,14 @@ namespace { } void _testRunCollectionLockTask(DatabaseTaskTest& test, LockMode mode) { - boost::mutex mutex; + stdx::mutex mutex; bool called = false; OperationContext* txn = nullptr; bool isCollectionLockedForMode = false; Status status = test.getDetectableErrorStatus(); // Task returning 'void' implies NextAction::NoAction. auto task = [&](OperationContext* theTxn, const Status& theStatus) { - boost::lock_guard<boost::mutex> lk(mutex); + stdx::lock_guard<stdx::mutex> lk(mutex); called = true; txn = theTxn; isCollectionLockedForMode = @@ -160,7 +159,7 @@ namespace { test.getThreadPool().join(); ASSERT_FALSE(test.getTaskRunner().isActive()); - boost::lock_guard<boost::mutex> lk(mutex); + stdx::lock_guard<stdx::mutex> lk(mutex); ASSERT_TRUE(called); ASSERT(txn); ASSERT_TRUE(isCollectionLockedForMode); |