diff options
author | Benety Goh <benety@mongodb.com> | 2015-05-27 12:32:18 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2015-05-29 19:59:31 -0400 |
commit | 36df0e5da71c5d256d8638e522866059f73b616b (patch) | |
tree | 0e24b79ba08bce591cf1249902d771ba480b657f /src/mongo/db/repl/collection_cloner.cpp | |
parent | 993fc5e4ed9264965f16a948d3732d3fc55d1255 (diff) | |
download | mongo-36df0e5da71c5d256d8638e522866059f73b616b.tar.gz |
SERVER-18035 clean up collection and database cloners to not hold mutex when reporting completion status
Diffstat (limited to 'src/mongo/db/repl/collection_cloner.cpp')
-rw-r--r-- | src/mongo/db/repl/collection_cloner.cpp | 102 |
1 files changed, 48 insertions, 54 deletions
diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp index 18d8dab6cbf..7d58ae3f2e8 100644 --- a/src/mongo/db/repl/collection_cloner.cpp +++ b/src/mongo/db/repl/collection_cloner.cpp @@ -32,8 +32,6 @@ #include "mongo/db/repl/collection_cloner.h" -#include <boost/thread/lock_guard.hpp> - #include "mongo/util/assert_util.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" @@ -45,14 +43,14 @@ namespace repl { const HostAndPort& source, const NamespaceString& sourceNss, const CollectionOptions& options, - const CallbackFn& work, + const CallbackFn& onCompletion, StorageInterface* storageInterface) : _executor(executor), _source(source), _sourceNss(sourceNss), _destNss(_sourceNss), _options(options), - _work(work), + _onCompletion(onCompletion), _storageInterface(storageInterface), _active(false), _listIndexesFetcher(_executor, @@ -77,25 +75,31 @@ namespace repl { _indexSpecs(), _documents(), _dbWorkCallbackHandle(), - // TODO: replace with executor database worker when it is available. - _scheduleDbWorkFn(stdx::bind(&ReplicationExecutor::scheduleWorkWithGlobalExclusiveLock, - _executor, - stdx::placeholders::_1)) { + _scheduleDbWorkFn([this](const ReplicationExecutor::CallbackFn& work) { + return _executor->scheduleDBWork(work); + }) { uassert(ErrorCodes::BadValue, "null replication executor", executor); uassert(ErrorCodes::BadValue, "invalid collection namespace: " + sourceNss.ns(), sourceNss.isValid()); uassertStatusOK(options.validate()); - uassert(ErrorCodes::BadValue, "callback function cannot be null", work); + uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion); uassert(ErrorCodes::BadValue, "null storage interface", storageInterface); } + CollectionCloner::~CollectionCloner() { + DESTRUCTOR_GUARD( + cancel(); + wait(); + ); + } + const NamespaceString& CollectionCloner::getSourceNamespace() const { return _sourceNss; } std::string CollectionCloner::getDiagnosticString() const { - boost::lock_guard<boost::mutex> lk(_mutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); str::stream output; output << "CollectionCloner"; output << " executor: " << _executor->getDiagnosticString(); @@ -112,12 +116,12 @@ namespace repl { } bool CollectionCloner::isActive() const { - boost::lock_guard<boost::mutex> lk(_mutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); return _active; } Status CollectionCloner::start() { - boost::lock_guard<boost::mutex> lk(_mutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); if (_active) { return Status(ErrorCodes::IllegalOperation, "collection cloner already started"); @@ -136,7 +140,7 @@ namespace repl { void CollectionCloner::cancel() { ReplicationExecutor::CallbackHandle dbWorkCallbackHandle; { - boost::lock_guard<boost::mutex> lk(_mutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); if (!_active) { return; @@ -154,16 +158,14 @@ namespace repl { } void CollectionCloner::wait() { - // If a fetcher is inactive, wait() has no effect. - _listIndexesFetcher.wait(); - _findFetcher.wait(); - waitForDbWorker(); + stdx::unique_lock<stdx::mutex> lk(_mutex); + _condition.wait(lk, [this]() { return !_active; }); } void CollectionCloner::waitForDbWorker() { ReplicationExecutor::CallbackHandle dbWorkCallbackHandle; { - boost::lock_guard<boost::mutex> lk(_mutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); if (!_active) { return; @@ -178,7 +180,7 @@ namespace repl { } void CollectionCloner::setScheduleDbWorkFn(const ScheduleDbWorkFn& scheduleDbWorkFn) { - boost::lock_guard<boost::mutex> lk(_mutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); _scheduleDbWorkFn = scheduleDbWorkFn; } @@ -186,12 +188,8 @@ namespace repl { void CollectionCloner::_listIndexesCallback(const StatusWith<Fetcher::BatchData>& fetchResult, Fetcher::NextAction* nextAction, BSONObjBuilder* getMoreBob) { - boost::lock_guard<boost::mutex> lk(_mutex); - - _active = false; - if (!fetchResult.isOK()) { - _work(fetchResult.getStatus()); + _finishCallback(nullptr, fetchResult.getStatus()); return; } @@ -212,8 +210,6 @@ namespace repl { invariant(getMoreBob); getMoreBob->append("getMore", batchData.cursorId); getMoreBob->append("collection", batchData.nss.coll()); - - _active = true; return; } @@ -221,23 +217,18 @@ namespace repl { auto&& scheduleResult = _scheduleDbWorkFn( stdx::bind(&CollectionCloner::_beginCollectionCallback, this, stdx::placeholders::_1)); if (!scheduleResult.isOK()) { - _work(scheduleResult.getStatus()); + _finishCallback(nullptr, scheduleResult.getStatus()); return; } - _active = true; _dbWorkCallbackHandle = scheduleResult.getValue(); } void CollectionCloner::_findCallback(const StatusWith<Fetcher::BatchData>& fetchResult, Fetcher::NextAction* nextAction, BSONObjBuilder* getMoreBob) { - boost::lock_guard<boost::mutex> lk(_mutex); - - _active = false; - if (!fetchResult.isOK()) { - _work(fetchResult.getStatus()); + _finishCallback(nullptr, fetchResult.getStatus()); return; } @@ -248,7 +239,7 @@ namespace repl { auto&& scheduleResult = _scheduleDbWorkFn(stdx::bind( &CollectionCloner::_insertDocumentsCallback, this, stdx::placeholders::_1, lastBatch)); if (!scheduleResult.isOK()) { - _work(scheduleResult.getStatus()); + _finishCallback(nullptr, scheduleResult.getStatus()); return; } @@ -258,59 +249,62 @@ namespace repl { getMoreBob->append("collection", batchData.nss.coll()); } - _active = true; _dbWorkCallbackHandle = scheduleResult.getValue(); } void CollectionCloner::_beginCollectionCallback(const ReplicationExecutor::CallbackData& cbd) { - boost::lock_guard<boost::mutex> lk(_mutex); - - _active = false; - + OperationContext* txn = cbd.txn; if (!cbd.status.isOK()) { - _work(cbd.status); + _finishCallback(txn, cbd.status); return; } - OperationContext* txn = cbd.txn; Status status = _storageInterface->beginCollection(txn, _destNss, _options, _indexSpecs); if (!status.isOK()) { - _work(status); + _finishCallback(txn, status); return; } Status scheduleStatus = _findFetcher.schedule(); if (!scheduleStatus.isOK()) { - _work(scheduleStatus); + _finishCallback(txn, scheduleStatus); return; } - - _active = true; } void CollectionCloner::_insertDocumentsCallback(const ReplicationExecutor::CallbackData& cbd, bool lastBatch) { - boost::lock_guard<boost::mutex> lk(_mutex); - - _active = false; - + OperationContext* txn = cbd.txn; if (!cbd.status.isOK()) { - _work(cbd.status); + _finishCallback(txn, cbd.status); return; } - Status status = _storageInterface->insertDocuments(cbd.txn, _destNss, _documents); + Status status = _storageInterface->insertDocuments(txn, _destNss, _documents); if (!status.isOK()) { - _work(status); + _finishCallback(txn, status); return; } if (!lastBatch) { - _active = true; return; } - _work(Status::OK()); + _finishCallback(txn, Status::OK()); + } + + void CollectionCloner::_finishCallback(OperationContext* txn, const Status& status) { + if (status.isOK()) { + auto status = _storageInterface->commitCollection(txn, _destNss); + if (!status.isOK()) { + warning() << "Failed to commit changes to collection " << _destNss.ns() + << ": " << status; + } + } + _onCompletion(status); + stdx::lock_guard<stdx::mutex> lk(_mutex); + _active = false; + _condition.notify_all(); } } // namespace repl |