summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/collection_cloner.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/collection_cloner.cpp')
-rw-r--r--src/mongo/db/repl/collection_cloner.cpp438
1 files changed, 217 insertions, 221 deletions
diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp
index 5abe3c2ed84..1f71fe762e7 100644
--- a/src/mongo/db/repl/collection_cloner.cpp
+++ b/src/mongo/db/repl/collection_cloner.cpp
@@ -39,273 +39,269 @@
namespace mongo {
namespace repl {
- CollectionCloner::CollectionCloner(ReplicationExecutor* executor,
- const HostAndPort& source,
- const NamespaceString& sourceNss,
- const CollectionOptions& options,
- const CallbackFn& onCompletion,
- StorageInterface* storageInterface)
- : _executor(executor),
- _source(source),
- _sourceNss(sourceNss),
- _destNss(_sourceNss),
- _options(options),
- _onCompletion(onCompletion),
- _storageInterface(storageInterface),
- _active(false),
- _listIndexesFetcher(_executor,
- _source,
- _sourceNss.db().toString(),
- BSON("listIndexes" << _sourceNss.coll()),
- stdx::bind(&CollectionCloner::_listIndexesCallback,
- this,
- stdx::placeholders::_1,
- stdx::placeholders::_2,
- stdx::placeholders::_3)),
- _findFetcher(_executor,
- _source,
- _sourceNss.db().toString(),
- BSON("find" << _sourceNss.coll() <<
- "noCursorTimeout" << true), // SERVER-1387
- stdx::bind(&CollectionCloner::_findCallback,
- this,
- stdx::placeholders::_1,
- stdx::placeholders::_2,
- stdx::placeholders::_3)),
- _indexSpecs(),
- _documents(),
- _dbWorkCallbackHandle(),
- _scheduleDbWorkFn([this](const ReplicationExecutor::CallbackFn& work) {
- return _executor->scheduleDBWork(work);
- }) {
-
- uassert(ErrorCodes::BadValue, "null replication executor", executor);
- uassert(ErrorCodes::BadValue, "invalid collection namespace: " + sourceNss.ns(),
- sourceNss.isValid());
- uassertStatusOK(options.validate());
- uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion);
- uassert(ErrorCodes::BadValue, "null storage interface", storageInterface);
+CollectionCloner::CollectionCloner(ReplicationExecutor* executor,
+ const HostAndPort& source,
+ const NamespaceString& sourceNss,
+ const CollectionOptions& options,
+ const CallbackFn& onCompletion,
+ StorageInterface* storageInterface)
+ : _executor(executor),
+ _source(source),
+ _sourceNss(sourceNss),
+ _destNss(_sourceNss),
+ _options(options),
+ _onCompletion(onCompletion),
+ _storageInterface(storageInterface),
+ _active(false),
+ _listIndexesFetcher(_executor,
+ _source,
+ _sourceNss.db().toString(),
+ BSON("listIndexes" << _sourceNss.coll()),
+ stdx::bind(&CollectionCloner::_listIndexesCallback,
+ this,
+ stdx::placeholders::_1,
+ stdx::placeholders::_2,
+ stdx::placeholders::_3)),
+ _findFetcher(_executor,
+ _source,
+ _sourceNss.db().toString(),
+ BSON("find" << _sourceNss.coll() << "noCursorTimeout" << true), // SERVER-1387
+ stdx::bind(&CollectionCloner::_findCallback,
+ this,
+ stdx::placeholders::_1,
+ stdx::placeholders::_2,
+ stdx::placeholders::_3)),
+ _indexSpecs(),
+ _documents(),
+ _dbWorkCallbackHandle(),
+ _scheduleDbWorkFn([this](const ReplicationExecutor::CallbackFn& work) {
+ return _executor->scheduleDBWork(work);
+ }) {
+ uassert(ErrorCodes::BadValue, "null replication executor", executor);
+ uassert(ErrorCodes::BadValue,
+ "invalid collection namespace: " + sourceNss.ns(),
+ sourceNss.isValid());
+ uassertStatusOK(options.validate());
+ uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion);
+ uassert(ErrorCodes::BadValue, "null storage interface", storageInterface);
+}
+
+CollectionCloner::~CollectionCloner() {
+ DESTRUCTOR_GUARD(cancel(); wait(););
+}
+
+const NamespaceString& CollectionCloner::getSourceNamespace() const {
+ return _sourceNss;
+}
+
+std::string CollectionCloner::getDiagnosticString() const {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ str::stream output;
+ output << "CollectionCloner";
+ output << " executor: " << _executor->getDiagnosticString();
+ output << " source: " << _source.toString();
+ output << " source namespace: " << _sourceNss.toString();
+ output << " destination namespace: " << _destNss.toString();
+ output << " collection options: " << _options.toBSON();
+ output << " active: " << _active;
+ output << " listIndexes fetcher: " << _listIndexesFetcher.getDiagnosticString();
+ output << " find fetcher: " << _findFetcher.getDiagnosticString();
+ output << " database worked callback handle: " << (_dbWorkCallbackHandle.isValid() ? "valid"
+ : "invalid");
+ return output;
+}
+
+bool CollectionCloner::isActive() const {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _active;
+}
+
+Status CollectionCloner::start() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ if (_active) {
+ return Status(ErrorCodes::IllegalOperation, "collection cloner already started");
}
- CollectionCloner::~CollectionCloner() {
- DESTRUCTOR_GUARD(
- cancel();
- wait();
- );
+ Status scheduleResult = _listIndexesFetcher.schedule();
+ if (!scheduleResult.isOK()) {
+ return scheduleResult;
}
- const NamespaceString& CollectionCloner::getSourceNamespace() const {
- return _sourceNss;
- }
+ _active = true;
- std::string CollectionCloner::getDiagnosticString() const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- str::stream output;
- output << "CollectionCloner";
- output << " executor: " << _executor->getDiagnosticString();
- output << " source: " << _source.toString();
- output << " source namespace: " << _sourceNss.toString();
- output << " destination namespace: " << _destNss.toString();
- output << " collection options: " << _options.toBSON();
- output << " active: " << _active;
- output << " listIndexes fetcher: " << _listIndexesFetcher.getDiagnosticString();
- output << " find fetcher: " << _findFetcher.getDiagnosticString();
- output << " database worked callback handle: "
- << (_dbWorkCallbackHandle.isValid() ? "valid" : "invalid");
- return output;
- }
+ return Status::OK();
+}
- bool CollectionCloner::isActive() const {
+void CollectionCloner::cancel() {
+ ReplicationExecutor::CallbackHandle dbWorkCallbackHandle;
+ {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _active;
- }
- Status CollectionCloner::start() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
-
- if (_active) {
- return Status(ErrorCodes::IllegalOperation, "collection cloner already started");
+ if (!_active) {
+ return;
}
- Status scheduleResult = _listIndexesFetcher.schedule();
- if (!scheduleResult.isOK()) {
- return scheduleResult;
- }
+ dbWorkCallbackHandle = _dbWorkCallbackHandle;
+ }
- _active = true;
+ _listIndexesFetcher.cancel();
+ _findFetcher.cancel();
- return Status::OK();
+ if (dbWorkCallbackHandle.isValid()) {
+ _executor->cancel(dbWorkCallbackHandle);
}
+}
- void CollectionCloner::cancel() {
- ReplicationExecutor::CallbackHandle dbWorkCallbackHandle;
- {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+void CollectionCloner::wait() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _condition.wait(lk, [this]() { return !_active; });
+}
- if (!_active) {
- return;
- }
+void CollectionCloner::waitForDbWorker() {
+ ReplicationExecutor::CallbackHandle dbWorkCallbackHandle;
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
- dbWorkCallbackHandle = _dbWorkCallbackHandle;
+ if (!_active) {
+ return;
}
- _listIndexesFetcher.cancel();
- _findFetcher.cancel();
-
- if (dbWorkCallbackHandle.isValid()) {
- _executor->cancel(dbWorkCallbackHandle);
- }
+ dbWorkCallbackHandle = _dbWorkCallbackHandle;
}
- void CollectionCloner::wait() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- _condition.wait(lk, [this]() { return !_active; });
+ if (dbWorkCallbackHandle.isValid()) {
+ _executor->wait(dbWorkCallbackHandle);
}
+}
- void CollectionCloner::waitForDbWorker() {
- ReplicationExecutor::CallbackHandle dbWorkCallbackHandle;
- {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
-
- if (!_active) {
- return;
- }
+void CollectionCloner::setScheduleDbWorkFn(const ScheduleDbWorkFn& scheduleDbWorkFn) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
- dbWorkCallbackHandle = _dbWorkCallbackHandle;
- }
+ _scheduleDbWorkFn = scheduleDbWorkFn;
+}
- if (dbWorkCallbackHandle.isValid()) {
- _executor->wait(dbWorkCallbackHandle);
- }
+void CollectionCloner::_listIndexesCallback(const Fetcher::QueryResponseStatus& fetchResult,
+ Fetcher::NextAction* nextAction,
+ BSONObjBuilder* getMoreBob) {
+ if (!fetchResult.isOK()) {
+ _finishCallback(nullptr, fetchResult.getStatus());
+ return;
}
- void CollectionCloner::setScheduleDbWorkFn(const ScheduleDbWorkFn& scheduleDbWorkFn) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ auto batchData(fetchResult.getValue());
+ auto&& documents = batchData.documents;
- _scheduleDbWorkFn = scheduleDbWorkFn;
+ if (documents.empty()) {
+ warning() << "No indexes found for collection " << _sourceNss.ns() << " while cloning from "
+ << _source;
}
- void CollectionCloner::_listIndexesCallback(const Fetcher::QueryResponseStatus& fetchResult,
- Fetcher::NextAction* nextAction,
- BSONObjBuilder* getMoreBob) {
- if (!fetchResult.isOK()) {
- _finishCallback(nullptr, fetchResult.getStatus());
- return;
- }
-
- auto batchData(fetchResult.getValue());
- auto&& documents = batchData.documents;
-
- if (documents.empty()) {
- warning() << "No indexes found for collection " << _sourceNss.ns()
- << " while cloning from " << _source;
- }
-
- // We may be called with multiple batches leading to a need to grow _indexSpecs.
- _indexSpecs.reserve(_indexSpecs.size() + documents.size());
- _indexSpecs.insert(_indexSpecs.end(), documents.begin(), documents.end());
-
- // The fetcher will continue to call with kGetMore until an error or the last batch.
- if (*nextAction == Fetcher::NextAction::kGetMore) {
- invariant(getMoreBob);
- getMoreBob->append("getMore", batchData.cursorId);
- getMoreBob->append("collection", batchData.nss.coll());
- return;
- }
-
- // We have all of the indexes now, so we can start cloning the collection data.
- auto&& scheduleResult = _scheduleDbWorkFn(
- stdx::bind(&CollectionCloner::_beginCollectionCallback, this, stdx::placeholders::_1));
- if (!scheduleResult.isOK()) {
- _finishCallback(nullptr, scheduleResult.getStatus());
- return;
- }
+ // We may be called with multiple batches leading to a need to grow _indexSpecs.
+ _indexSpecs.reserve(_indexSpecs.size() + documents.size());
+ _indexSpecs.insert(_indexSpecs.end(), documents.begin(), documents.end());
- _dbWorkCallbackHandle = scheduleResult.getValue();
+ // The fetcher will continue to call with kGetMore until an error or the last batch.
+ if (*nextAction == Fetcher::NextAction::kGetMore) {
+ invariant(getMoreBob);
+ getMoreBob->append("getMore", batchData.cursorId);
+ getMoreBob->append("collection", batchData.nss.coll());
+ return;
}
- void CollectionCloner::_findCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult,
- Fetcher::NextAction* nextAction,
- BSONObjBuilder* getMoreBob) {
- if (!fetchResult.isOK()) {
- _finishCallback(nullptr, fetchResult.getStatus());
- return;
- }
+ // We have all of the indexes now, so we can start cloning the collection data.
+ auto&& scheduleResult = _scheduleDbWorkFn(
+ stdx::bind(&CollectionCloner::_beginCollectionCallback, this, stdx::placeholders::_1));
+ if (!scheduleResult.isOK()) {
+ _finishCallback(nullptr, scheduleResult.getStatus());
+ return;
+ }
- auto batchData(fetchResult.getValue());
- _documents = batchData.documents;
+ _dbWorkCallbackHandle = scheduleResult.getValue();
+}
- bool lastBatch = *nextAction == Fetcher::NextAction::kNoAction;
- auto&& scheduleResult = _scheduleDbWorkFn(stdx::bind(
- &CollectionCloner::_insertDocumentsCallback, this, stdx::placeholders::_1, lastBatch));
- if (!scheduleResult.isOK()) {
- _finishCallback(nullptr, scheduleResult.getStatus());
- return;
- }
+void CollectionCloner::_findCallback(const StatusWith<Fetcher::QueryResponse>& fetchResult,
+ Fetcher::NextAction* nextAction,
+ BSONObjBuilder* getMoreBob) {
+ if (!fetchResult.isOK()) {
+ _finishCallback(nullptr, fetchResult.getStatus());
+ return;
+ }
- if (*nextAction == Fetcher::NextAction::kGetMore) {
- invariant(getMoreBob);
- getMoreBob->append("getMore", batchData.cursorId);
- getMoreBob->append("collection", batchData.nss.coll());
- }
+ auto batchData(fetchResult.getValue());
+ _documents = batchData.documents;
- _dbWorkCallbackHandle = scheduleResult.getValue();
+ bool lastBatch = *nextAction == Fetcher::NextAction::kNoAction;
+ auto&& scheduleResult = _scheduleDbWorkFn(stdx::bind(
+ &CollectionCloner::_insertDocumentsCallback, this, stdx::placeholders::_1, lastBatch));
+ if (!scheduleResult.isOK()) {
+ _finishCallback(nullptr, scheduleResult.getStatus());
+ return;
}
- void CollectionCloner::_beginCollectionCallback(const ReplicationExecutor::CallbackArgs& cbd) {
- OperationContext* txn = cbd.txn;
- if (!cbd.status.isOK()) {
- _finishCallback(txn, cbd.status);
- return;
- }
+ if (*nextAction == Fetcher::NextAction::kGetMore) {
+ invariant(getMoreBob);
+ getMoreBob->append("getMore", batchData.cursorId);
+ getMoreBob->append("collection", batchData.nss.coll());
+ }
- Status status = _storageInterface->beginCollection(txn, _destNss, _options, _indexSpecs);
- if (!status.isOK()) {
- _finishCallback(txn, status);
- return;
- }
+ _dbWorkCallbackHandle = scheduleResult.getValue();
+}
- Status scheduleStatus = _findFetcher.schedule();
- if (!scheduleStatus.isOK()) {
- _finishCallback(txn, scheduleStatus);
- return;
- }
+void CollectionCloner::_beginCollectionCallback(const ReplicationExecutor::CallbackArgs& cbd) {
+ OperationContext* txn = cbd.txn;
+ if (!cbd.status.isOK()) {
+ _finishCallback(txn, cbd.status);
+ return;
}
- void CollectionCloner::_insertDocumentsCallback(const ReplicationExecutor::CallbackArgs& cbd,
- bool lastBatch) {
- OperationContext* txn = cbd.txn;
- if (!cbd.status.isOK()) {
- _finishCallback(txn, cbd.status);
- return;
- }
+ Status status = _storageInterface->beginCollection(txn, _destNss, _options, _indexSpecs);
+ if (!status.isOK()) {
+ _finishCallback(txn, status);
+ return;
+ }
- Status status = _storageInterface->insertDocuments(txn, _destNss, _documents);
- if (!status.isOK()) {
- _finishCallback(txn, status);
- return;
- }
+ Status scheduleStatus = _findFetcher.schedule();
+ if (!scheduleStatus.isOK()) {
+ _finishCallback(txn, scheduleStatus);
+ return;
+ }
+}
+
+void CollectionCloner::_insertDocumentsCallback(const ReplicationExecutor::CallbackArgs& cbd,
+ bool lastBatch) {
+ OperationContext* txn = cbd.txn;
+ if (!cbd.status.isOK()) {
+ _finishCallback(txn, cbd.status);
+ return;
+ }
- if (!lastBatch) {
- return;
- }
+ Status status = _storageInterface->insertDocuments(txn, _destNss, _documents);
+ if (!status.isOK()) {
+ _finishCallback(txn, status);
+ return;
+ }
- _finishCallback(txn, Status::OK());
+ if (!lastBatch) {
+ return;
}
- void CollectionCloner::_finishCallback(OperationContext* txn, const Status& status) {
- if (status.isOK()) {
- auto commitStatus = _storageInterface->commitCollection(txn, _destNss);
- if (!commitStatus.isOK()) {
- warning() << "Failed to commit changes to collection " << _destNss.ns()
- << ": " << commitStatus;
- }
+ _finishCallback(txn, Status::OK());
+}
+
+void CollectionCloner::_finishCallback(OperationContext* txn, const Status& status) {
+ if (status.isOK()) {
+ auto commitStatus = _storageInterface->commitCollection(txn, _destNss);
+ if (!commitStatus.isOK()) {
+ warning() << "Failed to commit changes to collection " << _destNss.ns() << ": "
+ << commitStatus;
}
- _onCompletion(status);
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _active = false;
- _condition.notify_all();
}
-
-} // namespace repl
-} // namespace mongo
+ _onCompletion(status);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _active = false;
+ _condition.notify_all();
+}
+
+} // namespace repl
+} // namespace mongo