summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/collection_cloner.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/collection_cloner.cpp')
-rw-r--r--src/mongo/db/repl/collection_cloner.cpp102
1 files changed, 48 insertions, 54 deletions
diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp
index 18d8dab6cbf..7d58ae3f2e8 100644
--- a/src/mongo/db/repl/collection_cloner.cpp
+++ b/src/mongo/db/repl/collection_cloner.cpp
@@ -32,8 +32,6 @@
#include "mongo/db/repl/collection_cloner.h"
-#include <boost/thread/lock_guard.hpp>
-
#include "mongo/util/assert_util.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
@@ -45,14 +43,14 @@ namespace repl {
const HostAndPort& source,
const NamespaceString& sourceNss,
const CollectionOptions& options,
- const CallbackFn& work,
+ const CallbackFn& onCompletion,
StorageInterface* storageInterface)
: _executor(executor),
_source(source),
_sourceNss(sourceNss),
_destNss(_sourceNss),
_options(options),
- _work(work),
+ _onCompletion(onCompletion),
_storageInterface(storageInterface),
_active(false),
_listIndexesFetcher(_executor,
@@ -77,25 +75,31 @@ namespace repl {
_indexSpecs(),
_documents(),
_dbWorkCallbackHandle(),
- // TODO: replace with executor database worker when it is available.
- _scheduleDbWorkFn(stdx::bind(&ReplicationExecutor::scheduleWorkWithGlobalExclusiveLock,
- _executor,
- stdx::placeholders::_1)) {
+ _scheduleDbWorkFn([this](const ReplicationExecutor::CallbackFn& work) {
+ return _executor->scheduleDBWork(work);
+ }) {
uassert(ErrorCodes::BadValue, "null replication executor", executor);
uassert(ErrorCodes::BadValue, "invalid collection namespace: " + sourceNss.ns(),
sourceNss.isValid());
uassertStatusOK(options.validate());
- uassert(ErrorCodes::BadValue, "callback function cannot be null", work);
+ uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion);
uassert(ErrorCodes::BadValue, "null storage interface", storageInterface);
}
+ CollectionCloner::~CollectionCloner() {
+ DESTRUCTOR_GUARD(
+ cancel();
+ wait();
+ );
+ }
+
const NamespaceString& CollectionCloner::getSourceNamespace() const {
return _sourceNss;
}
std::string CollectionCloner::getDiagnosticString() const {
- boost::lock_guard<boost::mutex> lk(_mutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
str::stream output;
output << "CollectionCloner";
output << " executor: " << _executor->getDiagnosticString();
@@ -112,12 +116,12 @@ namespace repl {
}
bool CollectionCloner::isActive() const {
- boost::lock_guard<boost::mutex> lk(_mutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
return _active;
}
Status CollectionCloner::start() {
- boost::lock_guard<boost::mutex> lk(_mutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
if (_active) {
return Status(ErrorCodes::IllegalOperation, "collection cloner already started");
@@ -136,7 +140,7 @@ namespace repl {
void CollectionCloner::cancel() {
ReplicationExecutor::CallbackHandle dbWorkCallbackHandle;
{
- boost::lock_guard<boost::mutex> lk(_mutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
if (!_active) {
return;
@@ -154,16 +158,14 @@ namespace repl {
}
void CollectionCloner::wait() {
- // If a fetcher is inactive, wait() has no effect.
- _listIndexesFetcher.wait();
- _findFetcher.wait();
- waitForDbWorker();
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _condition.wait(lk, [this]() { return !_active; });
}
void CollectionCloner::waitForDbWorker() {
ReplicationExecutor::CallbackHandle dbWorkCallbackHandle;
{
- boost::lock_guard<boost::mutex> lk(_mutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
if (!_active) {
return;
@@ -178,7 +180,7 @@ namespace repl {
}
void CollectionCloner::setScheduleDbWorkFn(const ScheduleDbWorkFn& scheduleDbWorkFn) {
- boost::lock_guard<boost::mutex> lk(_mutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
_scheduleDbWorkFn = scheduleDbWorkFn;
}
@@ -186,12 +188,8 @@ namespace repl {
void CollectionCloner::_listIndexesCallback(const StatusWith<Fetcher::BatchData>& fetchResult,
Fetcher::NextAction* nextAction,
BSONObjBuilder* getMoreBob) {
- boost::lock_guard<boost::mutex> lk(_mutex);
-
- _active = false;
-
if (!fetchResult.isOK()) {
- _work(fetchResult.getStatus());
+ _finishCallback(nullptr, fetchResult.getStatus());
return;
}
@@ -212,8 +210,6 @@ namespace repl {
invariant(getMoreBob);
getMoreBob->append("getMore", batchData.cursorId);
getMoreBob->append("collection", batchData.nss.coll());
-
- _active = true;
return;
}
@@ -221,23 +217,18 @@ namespace repl {
auto&& scheduleResult = _scheduleDbWorkFn(
stdx::bind(&CollectionCloner::_beginCollectionCallback, this, stdx::placeholders::_1));
if (!scheduleResult.isOK()) {
- _work(scheduleResult.getStatus());
+ _finishCallback(nullptr, scheduleResult.getStatus());
return;
}
- _active = true;
_dbWorkCallbackHandle = scheduleResult.getValue();
}
void CollectionCloner::_findCallback(const StatusWith<Fetcher::BatchData>& fetchResult,
Fetcher::NextAction* nextAction,
BSONObjBuilder* getMoreBob) {
- boost::lock_guard<boost::mutex> lk(_mutex);
-
- _active = false;
-
if (!fetchResult.isOK()) {
- _work(fetchResult.getStatus());
+ _finishCallback(nullptr, fetchResult.getStatus());
return;
}
@@ -248,7 +239,7 @@ namespace repl {
auto&& scheduleResult = _scheduleDbWorkFn(stdx::bind(
&CollectionCloner::_insertDocumentsCallback, this, stdx::placeholders::_1, lastBatch));
if (!scheduleResult.isOK()) {
- _work(scheduleResult.getStatus());
+ _finishCallback(nullptr, scheduleResult.getStatus());
return;
}
@@ -258,59 +249,62 @@ namespace repl {
getMoreBob->append("collection", batchData.nss.coll());
}
- _active = true;
_dbWorkCallbackHandle = scheduleResult.getValue();
}
void CollectionCloner::_beginCollectionCallback(const ReplicationExecutor::CallbackData& cbd) {
- boost::lock_guard<boost::mutex> lk(_mutex);
-
- _active = false;
-
+ OperationContext* txn = cbd.txn;
if (!cbd.status.isOK()) {
- _work(cbd.status);
+ _finishCallback(txn, cbd.status);
return;
}
- OperationContext* txn = cbd.txn;
Status status = _storageInterface->beginCollection(txn, _destNss, _options, _indexSpecs);
if (!status.isOK()) {
- _work(status);
+ _finishCallback(txn, status);
return;
}
Status scheduleStatus = _findFetcher.schedule();
if (!scheduleStatus.isOK()) {
- _work(scheduleStatus);
+ _finishCallback(txn, scheduleStatus);
return;
}
-
- _active = true;
}
void CollectionCloner::_insertDocumentsCallback(const ReplicationExecutor::CallbackData& cbd,
bool lastBatch) {
- boost::lock_guard<boost::mutex> lk(_mutex);
-
- _active = false;
-
+ OperationContext* txn = cbd.txn;
if (!cbd.status.isOK()) {
- _work(cbd.status);
+ _finishCallback(txn, cbd.status);
return;
}
- Status status = _storageInterface->insertDocuments(cbd.txn, _destNss, _documents);
+ Status status = _storageInterface->insertDocuments(txn, _destNss, _documents);
if (!status.isOK()) {
- _work(status);
+ _finishCallback(txn, status);
return;
}
if (!lastBatch) {
- _active = true;
return;
}
- _work(Status::OK());
+ _finishCallback(txn, Status::OK());
+ }
+
+ void CollectionCloner::_finishCallback(OperationContext* txn, const Status& status) {
+ if (status.isOK()) {
+ auto status = _storageInterface->commitCollection(txn, _destNss);
+ if (!status.isOK()) {
+ warning() << "Failed to commit changes to collection " << _destNss.ns()
+ << ": " << status;
+ }
+ }
+ _onCompletion(status);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _active = false;
+ _condition.notify_all();
}
} // namespace repl