diff options
Diffstat (limited to 'src/mongo/db/repl/collection_cloner.cpp')
-rw-r--r-- | src/mongo/db/repl/collection_cloner.cpp | 438 |
1 files changed, 217 insertions, 221 deletions
diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp index 5abe3c2ed84..1f71fe762e7 100644 --- a/src/mongo/db/repl/collection_cloner.cpp +++ b/src/mongo/db/repl/collection_cloner.cpp @@ -39,273 +39,269 @@ namespace mongo { namespace repl { - CollectionCloner::CollectionCloner(ReplicationExecutor* executor, - const HostAndPort& source, - const NamespaceString& sourceNss, - const CollectionOptions& options, - const CallbackFn& onCompletion, - StorageInterface* storageInterface) - : _executor(executor), - _source(source), - _sourceNss(sourceNss), - _destNss(_sourceNss), - _options(options), - _onCompletion(onCompletion), - _storageInterface(storageInterface), - _active(false), - _listIndexesFetcher(_executor, - _source, - _sourceNss.db().toString(), - BSON("listIndexes" << _sourceNss.coll()), - stdx::bind(&CollectionCloner::_listIndexesCallback, - this, - stdx::placeholders::_1, - stdx::placeholders::_2, - stdx::placeholders::_3)), - _findFetcher(_executor, - _source, - _sourceNss.db().toString(), - BSON("find" << _sourceNss.coll() << - "noCursorTimeout" << true), // SERVER-1387 - stdx::bind(&CollectionCloner::_findCallback, - this, - stdx::placeholders::_1, - stdx::placeholders::_2, - stdx::placeholders::_3)), - _indexSpecs(), - _documents(), - _dbWorkCallbackHandle(), - _scheduleDbWorkFn([this](const ReplicationExecutor::CallbackFn& work) { - return _executor->scheduleDBWork(work); - }) { - - uassert(ErrorCodes::BadValue, "null replication executor", executor); - uassert(ErrorCodes::BadValue, "invalid collection namespace: " + sourceNss.ns(), - sourceNss.isValid()); - uassertStatusOK(options.validate()); - uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion); - uassert(ErrorCodes::BadValue, "null storage interface", storageInterface); +CollectionCloner::CollectionCloner(ReplicationExecutor* executor, + const HostAndPort& source, + const NamespaceString& sourceNss, + const CollectionOptions& options, + const CallbackFn& onCompletion, + StorageInterface* storageInterface) + : _executor(executor), + _source(source), + _sourceNss(sourceNss), + _destNss(_sourceNss), + _options(options), + _onCompletion(onCompletion), + _storageInterface(storageInterface), + _active(false), + _listIndexesFetcher(_executor, + _source, + _sourceNss.db().toString(), + BSON("listIndexes" << _sourceNss.coll()), + stdx::bind(&CollectionCloner::_listIndexesCallback, + this, + stdx::placeholders::_1, + stdx::placeholders::_2, + stdx::placeholders::_3)), + _findFetcher(_executor, + _source, + _sourceNss.db().toString(), + BSON("find" << _sourceNss.coll() << "noCursorTimeout" << true), // SERVER-1387 + stdx::bind(&CollectionCloner::_findCallback, + this, + stdx::placeholders::_1, + stdx::placeholders::_2, + stdx::placeholders::_3)), + _indexSpecs(), + _documents(), + _dbWorkCallbackHandle(), + _scheduleDbWorkFn([this](const ReplicationExecutor::CallbackFn& work) { + return _executor->scheduleDBWork(work); + }) { + uassert(ErrorCodes::BadValue, "null replication executor", executor); + uassert(ErrorCodes::BadValue, + "invalid collection namespace: " + sourceNss.ns(), + sourceNss.isValid()); + uassertStatusOK(options.validate()); + uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion); + uassert(ErrorCodes::BadValue, "null storage interface", storageInterface); +} + +CollectionCloner::~CollectionCloner() { + DESTRUCTOR_GUARD(cancel(); wait();); +} + +const NamespaceString& CollectionCloner::getSourceNamespace() const { + return _sourceNss; +} + +std::string CollectionCloner::getDiagnosticString() const { + stdx::lock_guard<stdx::mutex> lk(_mutex); + str::stream output; + output << "CollectionCloner"; + output << " executor: " << _executor->getDiagnosticString(); + output << " source: " << _source.toString(); + output << " source namespace: " << _sourceNss.toString(); + output << " destination namespace: " << _destNss.toString(); + output << " collection options: " << _options.toBSON(); + output << " active: " << _active; + output << " listIndexes fetcher: " << _listIndexesFetcher.getDiagnosticString(); + output << " find fetcher: " << _findFetcher.getDiagnosticString(); + output << " database worked callback handle: " << (_dbWorkCallbackHandle.isValid() ? "valid" + : "invalid"); + return output; +} + +bool CollectionCloner::isActive() const { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _active; +} + +Status CollectionCloner::start() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + if (_active) { + return Status(ErrorCodes::IllegalOperation, "collection cloner already started"); } - CollectionCloner::~CollectionCloner() { - DESTRUCTOR_GUARD( - cancel(); - wait(); - ); + Status scheduleResult = _listIndexesFetcher.schedule(); + if (!scheduleResult.isOK()) { + return scheduleResult; } - const NamespaceString& CollectionCloner::getSourceNamespace() const { - return _sourceNss; - } + _active = true; - std::string CollectionCloner::getDiagnosticString() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); - str::stream output; - output << "CollectionCloner"; - output << " executor: " << _executor->getDiagnosticString(); - output << " source: " << _source.toString(); - output << " source namespace: " << _sourceNss.toString(); - output << " destination namespace: " << _destNss.toString(); - output << " collection options: " << _options.toBSON(); - output << " active: " << _active; - output << " listIndexes fetcher: " << _listIndexesFetcher.getDiagnosticString(); - output << " find fetcher: " << _findFetcher.getDiagnosticString(); - output << " database worked callback handle: " - << (_dbWorkCallbackHandle.isValid() ? "valid" : "invalid"); - return output; - } + return Status::OK(); +} - bool CollectionCloner::isActive() const { +void CollectionCloner::cancel() { + ReplicationExecutor::CallbackHandle dbWorkCallbackHandle; + { stdx::lock_guard<stdx::mutex> lk(_mutex); - return _active; - } - Status CollectionCloner::start() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - - if (_active) { - return Status(ErrorCodes::IllegalOperation, "collection cloner already started"); + if (!_active) { + return; } - Status scheduleResult = _listIndexesFetcher.schedule(); - if (!scheduleResult.isOK()) { - return scheduleResult; - } + dbWorkCallbackHandle = _dbWorkCallbackHandle; + } - _active = true; + _listIndexesFetcher.cancel(); + _findFetcher.cancel(); - return Status::OK(); + if (dbWorkCallbackHandle.isValid()) { + _executor->cancel(dbWorkCallbackHandle); } +} - void CollectionCloner::cancel() { - ReplicationExecutor::CallbackHandle dbWorkCallbackHandle; - { - stdx::lock_guard<stdx::mutex> lk(_mutex); +void CollectionCloner::wait() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + _condition.wait(lk, [this]() { return !_active; }); +} - if (!_active) { - return; - } +void CollectionCloner::waitForDbWorker() { + ReplicationExecutor::CallbackHandle dbWorkCallbackHandle; + { + stdx::lock_guard<stdx::mutex> lk(_mutex); - dbWorkCallbackHandle = _dbWorkCallbackHandle; + if (!_active) { + return; } - _listIndexesFetcher.cancel(); - _findFetcher.cancel(); - - if (dbWorkCallbackHandle.isValid()) { - _executor->cancel(dbWorkCallbackHandle); - } + dbWorkCallbackHandle = _dbWorkCallbackHandle; } - void CollectionCloner::wait() { - stdx::unique_lock<stdx::mutex> lk(_mutex); - _condition.wait(lk, [this]() { return !_active; }); + if (dbWorkCallbackHandle.isValid()) { + _executor->wait(dbWorkCallbackHandle); } +} - void CollectionCloner::waitForDbWorker() { - ReplicationExecutor::CallbackHandle dbWorkCallbackHandle; - { - stdx::lock_guard<stdx::mutex> lk(_mutex); - - if (!_active) { - return; - } +void CollectionCloner::setScheduleDbWorkFn(const ScheduleDbWorkFn& scheduleDbWorkFn) { + stdx::lock_guard<stdx::mutex> lk(_mutex); - dbWorkCallbackHandle = _dbWorkCallbackHandle; - } + _scheduleDbWorkFn = scheduleDbWorkFn; +} - if (dbWorkCallbackHandle.isValid()) { - _executor->wait(dbWorkCallbackHandle); - } +void CollectionCloner::_listIndexesCallback(const Fetcher::QueryResponseStatus& fetchResult, + Fetcher::NextAction* nextAction, + BSONObjBuilder* getMoreBob) { + if (!fetchResult.isOK()) { + _finishCallback(nullptr, fetchResult.getStatus()); + return; } - void CollectionCloner::setScheduleDbWorkFn(const ScheduleDbWorkFn& scheduleDbWorkFn) { - stdx::lock_guard<stdx::mutex> lk(_mutex); + auto batchData(fetchResult.getValue()); + auto&& documents = batchData.documents; - _scheduleDbWorkFn = scheduleDbWorkFn; + if (documents.empty()) { + warning() << "No indexes found for collection " << _sourceNss.ns() << " while cloning from " + << _source; } - void CollectionCloner::_listIndexesCallback(const Fetcher::QueryResponseStatus& fetchResult, - Fetcher::NextAction* nextAction, - BSONObjBuilder* getMoreBob) { - if (!fetchResult.isOK()) { - _finishCallback(nullptr, fetchResult.getStatus()); - return; - } - - auto batchData(fetchResult.getValue()); - auto&& documents = batchData.documents; - - if (documents.empty()) { - warning() << "No indexes found for collection " << _sourceNss.ns() - << " while cloning from " << _source; - } - - // We may be called with multiple batches leading to a need to grow _indexSpecs. - _indexSpecs.reserve(_indexSpecs.size() + documents.size()); - _indexSpecs.insert(_indexSpecs.end(), documents.begin(), documents.end()); - - // The fetcher will continue to call with kGetMore until an error or the last batch. - if (*nextAction == Fetcher::NextAction::kGetMore) { - invariant(getMoreBob); - getMoreBob->append("getMore", batchData.cursorId); - getMoreBob->append("collection", batchData.nss.coll()); - return; - } - - // We have all of the indexes now, so we can start cloning the collection data. - auto&& scheduleResult = _scheduleDbWorkFn( - stdx::bind(&CollectionCloner::_beginCollectionCallback, this, stdx::placeholders::_1)); - if (!scheduleResult.isOK()) { - _finishCallback(nullptr, scheduleResult.getStatus()); - return; - } + // We may be called with multiple batches leading to a need to grow _indexSpecs. + _indexSpecs.reserve(_indexSpecs.size() + documents.size()); + _indexSpecs.insert(_indexSpecs.end(), documents.begin(), documents.end()); - _dbWorkCallbackHandle = scheduleResult.getValue(); + // The fetcher will continue to call with kGetMore until an error or the last batch. + if (*nextAction == Fetcher::NextAction::kGetMore) { + invariant(getMoreBob); + getMoreBob->append("getMore", batchData.cursorId); + getMoreBob->append("collection", batchData.nss.coll()); + return; } - void CollectionCloner::_findCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult, - Fetcher::NextAction* nextAction, - BSONObjBuilder* getMoreBob) { - if (!fetchResult.isOK()) { - _finishCallback(nullptr, fetchResult.getStatus()); - return; - } + // We have all of the indexes now, so we can start cloning the collection data. + auto&& scheduleResult = _scheduleDbWorkFn( + stdx::bind(&CollectionCloner::_beginCollectionCallback, this, stdx::placeholders::_1)); + if (!scheduleResult.isOK()) { + _finishCallback(nullptr, scheduleResult.getStatus()); + return; + } - auto batchData(fetchResult.getValue()); - _documents = batchData.documents; + _dbWorkCallbackHandle = scheduleResult.getValue(); +} - bool lastBatch = *nextAction == Fetcher::NextAction::kNoAction; - auto&& scheduleResult = _scheduleDbWorkFn(stdx::bind( - &CollectionCloner::_insertDocumentsCallback, this, stdx::placeholders::_1, lastBatch)); - if (!scheduleResult.isOK()) { - _finishCallback(nullptr, scheduleResult.getStatus()); - return; - } +void CollectionCloner::_findCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult, + Fetcher::NextAction* nextAction, + BSONObjBuilder* getMoreBob) { + if (!fetchResult.isOK()) { + _finishCallback(nullptr, fetchResult.getStatus()); + return; + } - if (*nextAction == Fetcher::NextAction::kGetMore) { - invariant(getMoreBob); - getMoreBob->append("getMore", batchData.cursorId); - getMoreBob->append("collection", batchData.nss.coll()); - } + auto batchData(fetchResult.getValue()); + _documents = batchData.documents; - _dbWorkCallbackHandle = scheduleResult.getValue(); + bool lastBatch = *nextAction == Fetcher::NextAction::kNoAction; + auto&& scheduleResult = _scheduleDbWorkFn(stdx::bind( + &CollectionCloner::_insertDocumentsCallback, this, stdx::placeholders::_1, lastBatch)); + if (!scheduleResult.isOK()) { + _finishCallback(nullptr, scheduleResult.getStatus()); + return; } - void CollectionCloner::_beginCollectionCallback(const ReplicationExecutor::CallbackArgs& cbd) { - OperationContext* txn = cbd.txn; - if (!cbd.status.isOK()) { - _finishCallback(txn, cbd.status); - return; - } + if (*nextAction == Fetcher::NextAction::kGetMore) { + invariant(getMoreBob); + getMoreBob->append("getMore", batchData.cursorId); + getMoreBob->append("collection", batchData.nss.coll()); + } - Status status = _storageInterface->beginCollection(txn, _destNss, _options, _indexSpecs); - if (!status.isOK()) { - _finishCallback(txn, status); - return; - } + _dbWorkCallbackHandle = scheduleResult.getValue(); +} - Status scheduleStatus = _findFetcher.schedule(); - if (!scheduleStatus.isOK()) { - _finishCallback(txn, scheduleStatus); - return; - } +void CollectionCloner::_beginCollectionCallback(const ReplicationExecutor::CallbackArgs& cbd) { + OperationContext* txn = cbd.txn; + if (!cbd.status.isOK()) { + _finishCallback(txn, cbd.status); + return; } - void CollectionCloner::_insertDocumentsCallback(const ReplicationExecutor::CallbackArgs& cbd, - bool lastBatch) { - OperationContext* txn = cbd.txn; - if (!cbd.status.isOK()) { - _finishCallback(txn, cbd.status); - return; - } + Status status = _storageInterface->beginCollection(txn, _destNss, _options, _indexSpecs); + if (!status.isOK()) { + _finishCallback(txn, status); + return; + } - Status status = _storageInterface->insertDocuments(txn, _destNss, _documents); - if (!status.isOK()) { - _finishCallback(txn, status); - return; - } + Status scheduleStatus = _findFetcher.schedule(); + if (!scheduleStatus.isOK()) { + _finishCallback(txn, scheduleStatus); + return; + } +} + +void CollectionCloner::_insertDocumentsCallback(const ReplicationExecutor::CallbackArgs& cbd, + bool lastBatch) { + OperationContext* txn = cbd.txn; + if (!cbd.status.isOK()) { + _finishCallback(txn, cbd.status); + return; + } - if (!lastBatch) { - return; - } + Status status = _storageInterface->insertDocuments(txn, _destNss, _documents); + if (!status.isOK()) { + _finishCallback(txn, status); + return; + } - _finishCallback(txn, Status::OK()); + if (!lastBatch) { + return; } - void CollectionCloner::_finishCallback(OperationContext* txn, const Status& status) { - if (status.isOK()) { - auto commitStatus = _storageInterface->commitCollection(txn, _destNss); - if (!commitStatus.isOK()) { - warning() << "Failed to commit changes to collection " << _destNss.ns() - << ": " << commitStatus; - } + _finishCallback(txn, Status::OK()); +} + +void CollectionCloner::_finishCallback(OperationContext* txn, const Status& status) { + if (status.isOK()) { + auto commitStatus = _storageInterface->commitCollection(txn, _destNss); + if (!commitStatus.isOK()) { + warning() << "Failed to commit changes to collection " << _destNss.ns() << ": " + << commitStatus; } - _onCompletion(status); - stdx::lock_guard<stdx::mutex> lk(_mutex); - _active = false; - _condition.notify_all(); } - -} // namespace repl -} // namespace mongo + _onCompletion(status); + stdx::lock_guard<stdx::mutex> lk(_mutex); + _active = false; + _condition.notify_all(); +} + +} // namespace repl +} // namespace mongo |