diff options
author | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2015-06-20 00:22:50 -0400 |
---|---|---|
committer | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2015-06-20 10:56:02 -0400 |
commit | 9c2ed42daa8fbbef4a919c21ec564e2db55e8d60 (patch) | |
tree | 3814f79c10d7b490948d8cb7b112ac1dd41ceff1 /src/mongo/db/repl/database_cloner.cpp | |
parent | 01965cf52bce6976637ecb8f4a622aeb05ab256a (diff) | |
download | mongo-9c2ed42daa8fbbef4a919c21ec564e2db55e8d60.tar.gz |
SERVER-18579: Clang-Format - reformat code, no comment reflow
Diffstat (limited to 'src/mongo/db/repl/database_cloner.cpp')
-rw-r--r-- | src/mongo/db/repl/database_cloner.cpp | 483 |
1 files changed, 238 insertions, 245 deletions
diff --git a/src/mongo/db/repl/database_cloner.cpp b/src/mongo/db/repl/database_cloner.cpp index b96ab403169..b0c0bddcd01 100644 --- a/src/mongo/db/repl/database_cloner.cpp +++ b/src/mongo/db/repl/database_cloner.cpp @@ -47,294 +47,287 @@ namespace repl { namespace { - const char* kNameFieldName = "name"; - const char* kOptionsFieldName = "options"; - - /** - * Default listCollections predicate. - */ - bool acceptAllPred(const BSONObj&) { - return true; - } +const char* kNameFieldName = "name"; +const char* kOptionsFieldName = "options"; - /** - * Creates a listCollections command obj with an optional filter. - */ - BSONObj createListCollectionsCommandObject(const BSONObj& filter) { - BSONObjBuilder output; - output.append("listCollections", 1); - if (!filter.isEmpty()) { - output.append("filter", filter); - } - return output.obj(); - } +/** + * Default listCollections predicate. + */ +bool acceptAllPred(const BSONObj&) { + return true; +} -} // namespace - - DatabaseCloner::DatabaseCloner(ReplicationExecutor* executor, - const HostAndPort& source, - const std::string& dbname, - const BSONObj& listCollectionsFilter, - const ListCollectionsPredicateFn& listCollectionsPred, - CollectionCloner::StorageInterface* si, - const CollectionCallbackFn& collWork, - const CallbackFn& onCompletion) - : _executor(executor), - _source(source), - _dbname(dbname), - _listCollectionsFilter(listCollectionsFilter), - _listCollectionsPredicate(listCollectionsPred ? listCollectionsPred : acceptAllPred), - _storageInterface(si), - _collectionWork(collWork), - _onCompletion(onCompletion), - _active(false), - _listCollectionsFetcher(_executor, - _source, - _dbname, - createListCollectionsCommandObject(_listCollectionsFilter), - stdx::bind(&DatabaseCloner::_listCollectionsCallback, - this, - stdx::placeholders::_1, - stdx::placeholders::_2, - stdx::placeholders::_3)), - _scheduleDbWorkFn([this](const ReplicationExecutor::CallbackFn& work) { - return _executor->scheduleDBWork(work); - }), - _startCollectionCloner([](CollectionCloner& cloner) { return cloner.start(); }) { - - uassert(ErrorCodes::BadValue, "null replication executor", executor); - uassert(ErrorCodes::BadValue, "empty database name", !dbname.empty()); - uassert(ErrorCodes::BadValue, "storage interface cannot be null", si); - uassert(ErrorCodes::BadValue, "collection callback function cannot be null", collWork); - uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion); +/** + * Creates a listCollections command obj with an optional filter. + */ +BSONObj createListCollectionsCommandObject(const BSONObj& filter) { + BSONObjBuilder output; + output.append("listCollections", 1); + if (!filter.isEmpty()) { + output.append("filter", filter); } - - DatabaseCloner::~DatabaseCloner() { - DESTRUCTOR_GUARD( - cancel(); - wait(); - ); + return output.obj(); +} + +} // namespace + +DatabaseCloner::DatabaseCloner(ReplicationExecutor* executor, + const HostAndPort& source, + const std::string& dbname, + const BSONObj& listCollectionsFilter, + const ListCollectionsPredicateFn& listCollectionsPred, + CollectionCloner::StorageInterface* si, + const CollectionCallbackFn& collWork, + const CallbackFn& onCompletion) + : _executor(executor), + _source(source), + _dbname(dbname), + _listCollectionsFilter(listCollectionsFilter), + _listCollectionsPredicate(listCollectionsPred ? listCollectionsPred : acceptAllPred), + _storageInterface(si), + _collectionWork(collWork), + _onCompletion(onCompletion), + _active(false), + _listCollectionsFetcher(_executor, + _source, + _dbname, + createListCollectionsCommandObject(_listCollectionsFilter), + stdx::bind(&DatabaseCloner::_listCollectionsCallback, + this, + stdx::placeholders::_1, + stdx::placeholders::_2, + stdx::placeholders::_3)), + _scheduleDbWorkFn([this](const ReplicationExecutor::CallbackFn& work) { + return _executor->scheduleDBWork(work); + }), + _startCollectionCloner([](CollectionCloner& cloner) { return cloner.start(); }) { + uassert(ErrorCodes::BadValue, "null replication executor", executor); + uassert(ErrorCodes::BadValue, "empty database name", !dbname.empty()); + uassert(ErrorCodes::BadValue, "storage interface cannot be null", si); + uassert(ErrorCodes::BadValue, "collection callback function cannot be null", collWork); + uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion); +} + +DatabaseCloner::~DatabaseCloner() { + DESTRUCTOR_GUARD(cancel(); wait();); +} + +const std::vector<BSONObj>& DatabaseCloner::getCollectionInfos() const { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _collectionInfos; +} + +std::string DatabaseCloner::getDiagnosticString() const { + stdx::lock_guard<stdx::mutex> lk(_mutex); + str::stream output; + output << "DatabaseCloner"; + output << " executor: " << _executor->getDiagnosticString(); + output << " source: " << _source.toString(); + output << " database: " << _dbname; + output << " listCollections filter" << _listCollectionsFilter; + output << " active: " << _active; + output << " collection info objects (empty if listCollections is in progress): " + << _collectionInfos.size(); + return output; +} + +bool DatabaseCloner::isActive() const { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _active; +} + +Status DatabaseCloner::start() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + if (_active) { + return Status(ErrorCodes::IllegalOperation, "database cloner already started"); } - const std::vector<BSONObj>& DatabaseCloner::getCollectionInfos() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); - return _collectionInfos; + Status scheduleResult = _listCollectionsFetcher.schedule(); + if (!scheduleResult.isOK()) { + return scheduleResult; } - std::string DatabaseCloner::getDiagnosticString() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); - str::stream output; - output << "DatabaseCloner"; - output << " executor: " << _executor->getDiagnosticString(); - output << " source: " << _source.toString(); - output << " database: " << _dbname; - output << " listCollections filter" << _listCollectionsFilter; - output << " active: " << _active; - output << " collection info objects (empty if listCollections is in progress): " - << _collectionInfos.size(); - return output; - } + _active = true; - bool DatabaseCloner::isActive() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); - return _active; - } + return Status::OK(); +} - Status DatabaseCloner::start() { +void DatabaseCloner::cancel() { + { stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_active) { - return Status(ErrorCodes::IllegalOperation, "database cloner already started"); - } - - Status scheduleResult = _listCollectionsFetcher.schedule(); - if (!scheduleResult.isOK()) { - return scheduleResult; + if (!_active) { + return; } - - _active = true; - - return Status::OK(); } - void DatabaseCloner::cancel() { - { - stdx::lock_guard<stdx::mutex> lk(_mutex); + _listCollectionsFetcher.cancel(); +} - if (!_active) { - return; - } - } +void DatabaseCloner::wait() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + _condition.wait(lk, [this]() { return !_active; }); +} - _listCollectionsFetcher.cancel(); - } +void DatabaseCloner::setScheduleDbWorkFn(const CollectionCloner::ScheduleDbWorkFn& work) { + stdx::lock_guard<stdx::mutex> lk(_mutex); - void DatabaseCloner::wait() { - stdx::unique_lock<stdx::mutex> lk(_mutex); - _condition.wait(lk, [this]() { return !_active; }); - } + _scheduleDbWorkFn = work; +} - void DatabaseCloner::setScheduleDbWorkFn(const CollectionCloner::ScheduleDbWorkFn& work) { - stdx::lock_guard<stdx::mutex> lk(_mutex); +void DatabaseCloner::setStartCollectionClonerFn( + const StartCollectionClonerFn& startCollectionCloner) { + _startCollectionCloner = startCollectionCloner; +} - _scheduleDbWorkFn = work; +void DatabaseCloner::_listCollectionsCallback(const StatusWith<Fetcher::QueryResponse>& result, + Fetcher::NextAction* nextAction, + BSONObjBuilder* getMoreBob) { + if (!result.isOK()) { + _finishCallback(result.getStatus()); + return; } - void DatabaseCloner::setStartCollectionClonerFn( - const StartCollectionClonerFn& startCollectionCloner) { - - _startCollectionCloner = startCollectionCloner; + auto batchData(result.getValue()); + auto&& documents = batchData.documents; + + // We may be called with multiple batches leading to a need to grow _collectionInfos. + _collectionInfos.reserve(_collectionInfos.size() + documents.size()); + std::copy_if(documents.begin(), + documents.end(), + std::back_inserter(_collectionInfos), + _listCollectionsPredicate); + + // The fetcher will continue to call with kGetMore until an error or the last batch. + if (*nextAction == Fetcher::NextAction::kGetMore) { + invariant(getMoreBob); + getMoreBob->append("getMore", batchData.cursorId); + getMoreBob->append("collection", batchData.nss.coll()); + return; } - void DatabaseCloner::_listCollectionsCallback(const StatusWith<Fetcher::QueryResponse>& result, - Fetcher::NextAction* nextAction, - BSONObjBuilder* getMoreBob) { + // Nothing to do for an empty database. + if (_collectionInfos.empty()) { + _finishCallback(Status::OK()); + return; + } - if (!result.isOK()) { - _finishCallback(result.getStatus()); + _collectionNamespaces.reserve(_collectionInfos.size()); + std::set<std::string> seen; + for (auto&& info : _collectionInfos) { + BSONElement nameElement = info.getField(kNameFieldName); + if (nameElement.eoo()) { + _finishCallback(Status(ErrorCodes::FailedToParse, + str::stream() << "collection info must contain '" + << kNameFieldName << "' " + << "field : " << info)); return; } - - auto batchData(result.getValue()); - auto&& documents = batchData.documents; - - // We may be called with multiple batches leading to a need to grow _collectionInfos. - _collectionInfos.reserve(_collectionInfos.size() + documents.size()); - std::copy_if(documents.begin(), documents.end(), - std::back_inserter(_collectionInfos), - _listCollectionsPredicate); - - // The fetcher will continue to call with kGetMore until an error or the last batch. - if (*nextAction == Fetcher::NextAction::kGetMore) { - invariant(getMoreBob); - getMoreBob->append("getMore", batchData.cursorId); - getMoreBob->append("collection", batchData.nss.coll()); + if (nameElement.type() != mongo::String) { + _finishCallback(Status(ErrorCodes::TypeMismatch, + str::stream() << "'" << kNameFieldName + << "' field must be a string: " << info)); return; } - - // Nothing to do for an empty database. - if (_collectionInfos.empty()) { - _finishCallback(Status::OK()); + const std::string collectionName = nameElement.String(); + if (seen.find(collectionName) != seen.end()) { + _finishCallback(Status(ErrorCodes::DuplicateKey, + str::stream() + << "collection info contains duplicate collection name " + << "'" << collectionName << "': " << info)); return; } - _collectionNamespaces.reserve(_collectionInfos.size()); - std::set<std::string> seen; - for (auto&& info : _collectionInfos) { - BSONElement nameElement = info.getField(kNameFieldName); - if (nameElement.eoo()) { - _finishCallback(Status(ErrorCodes::FailedToParse, str::stream() << - "collection info must contain '" << kNameFieldName << "' " << - "field : " << info)); - return; - } - if (nameElement.type() != mongo::String) { - _finishCallback(Status(ErrorCodes::TypeMismatch, str::stream() << - "'" << kNameFieldName << "' field must be a string: " << info)); - return; - } - const std::string collectionName = nameElement.String(); - if (seen.find(collectionName) != seen.end()) { - _finishCallback(Status(ErrorCodes::DuplicateKey, str::stream() << - "collection info contains duplicate collection name " << - "'" << collectionName << "': " << info)); - return; - } - - BSONElement optionsElement = info.getField(kOptionsFieldName); - if (optionsElement.eoo()) { - _finishCallback(Status(ErrorCodes::FailedToParse, str::stream() << - "collection info must contain '" << kOptionsFieldName << "' " << - "field : " << info)); - return; - } - if (!optionsElement.isABSONObj()) { - _finishCallback(Status(ErrorCodes::TypeMismatch, str::stream() << - "'" << kOptionsFieldName << "' field must be an object: " << - info)); - return; - } - const BSONObj optionsObj = optionsElement.Obj(); - CollectionOptions options; - Status parseStatus = options.parse(optionsObj); - if (!parseStatus.isOK()) { - _finishCallback(parseStatus); - return; - } - seen.insert(collectionName); - - _collectionNamespaces.emplace_back(_dbname, collectionName); - auto&& nss = *_collectionNamespaces.crbegin(); - - try { - _collectionCloners.emplace_back( - _executor, - _source, - nss, - options, - stdx::bind(&DatabaseCloner::_collectionClonerCallback, - this, - stdx::placeholders::_1, - nss), - _storageInterface); - } - catch (const UserException& ex) { - _finishCallback(ex.toStatus()); - return; - } + BSONElement optionsElement = info.getField(kOptionsFieldName); + if (optionsElement.eoo()) { + _finishCallback(Status(ErrorCodes::FailedToParse, + str::stream() << "collection info must contain '" + << kOptionsFieldName << "' " + << "field : " << info)); + return; } - - for (auto&& collectionCloner : _collectionCloners) { - collectionCloner.setScheduleDbWorkFn(_scheduleDbWorkFn); + if (!optionsElement.isABSONObj()) { + _finishCallback(Status(ErrorCodes::TypeMismatch, + str::stream() << "'" << kOptionsFieldName + << "' field must be an object: " << info)); + return; } + const BSONObj optionsObj = optionsElement.Obj(); + CollectionOptions options; + Status parseStatus = options.parse(optionsObj); + if (!parseStatus.isOK()) { + _finishCallback(parseStatus); + return; + } + seen.insert(collectionName); + + _collectionNamespaces.emplace_back(_dbname, collectionName); + auto&& nss = *_collectionNamespaces.crbegin(); + + try { + _collectionCloners.emplace_back( + _executor, + _source, + nss, + options, + stdx::bind( + &DatabaseCloner::_collectionClonerCallback, this, stdx::placeholders::_1, nss), + _storageInterface); + } catch (const UserException& ex) { + _finishCallback(ex.toStatus()); + return; + } + } + + for (auto&& collectionCloner : _collectionCloners) { + collectionCloner.setScheduleDbWorkFn(_scheduleDbWorkFn); + } - // Start first collection cloner. - _currentCollectionClonerIter = _collectionCloners.begin(); + // Start first collection cloner. + _currentCollectionClonerIter = _collectionCloners.begin(); - LOG(1) << " cloning collection " << _currentCollectionClonerIter->getSourceNamespace(); + LOG(1) << " cloning collection " << _currentCollectionClonerIter->getSourceNamespace(); + Status startStatus = _startCollectionCloner(*_currentCollectionClonerIter); + if (!startStatus.isOK()) { + LOG(1) << " failed to start collection cloning on " + << _currentCollectionClonerIter->getSourceNamespace() << ": " << startStatus; + _finishCallback(startStatus); + return; + } +} + +void DatabaseCloner::_collectionClonerCallback(const Status& status, const NamespaceString& nss) { + // Forward collection cloner result to caller. + // Failure to clone a collection does not stop the database cloner + // from cloning the rest of the collections in the listCollections result. + _collectionWork(status, nss); + + _currentCollectionClonerIter++; + + LOG(1) << " cloning collection " << _currentCollectionClonerIter->getSourceNamespace(); + + if (_currentCollectionClonerIter != _collectionCloners.end()) { Status startStatus = _startCollectionCloner(*_currentCollectionClonerIter); if (!startStatus.isOK()) { LOG(1) << " failed to start collection cloning on " - << _currentCollectionClonerIter->getSourceNamespace() - << ": " << startStatus; + << _currentCollectionClonerIter->getSourceNamespace() << ": " << startStatus; _finishCallback(startStatus); return; } + return; } - void DatabaseCloner::_collectionClonerCallback(const Status& status, - const NamespaceString& nss) { - // Forward collection cloner result to caller. - // Failure to clone a collection does not stop the database cloner - // from cloning the rest of the collections in the listCollections result. - _collectionWork(status, nss); - - _currentCollectionClonerIter++; - - LOG(1) << " cloning collection " << _currentCollectionClonerIter->getSourceNamespace(); - - if (_currentCollectionClonerIter != _collectionCloners.end()) { - Status startStatus = _startCollectionCloner(*_currentCollectionClonerIter); - if (!startStatus.isOK()) { - LOG(1) << " failed to start collection cloning on " - << _currentCollectionClonerIter->getSourceNamespace() - << ": " << startStatus; - _finishCallback(startStatus); - return; - } - return; - } - - _finishCallback(Status::OK()); - } + _finishCallback(Status::OK()); +} - void DatabaseCloner::_finishCallback(const Status& status) { - _onCompletion(status); - stdx::lock_guard<stdx::mutex> lk(_mutex); - _active = false; - _condition.notify_all(); - } +void DatabaseCloner::_finishCallback(const Status& status) { + _onCompletion(status); + stdx::lock_guard<stdx::mutex> lk(_mutex); + _active = false; + _condition.notify_all(); +} -} // namespace repl -} // namespace mongo +} // namespace repl +} // namespace mongo |