summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2015-05-27 12:32:18 -0400
committerBenety Goh <benety@mongodb.com>2015-05-29 19:59:31 -0400
commit36df0e5da71c5d256d8638e522866059f73b616b (patch)
tree0e24b79ba08bce591cf1249902d771ba480b657f
parent993fc5e4ed9264965f16a948d3732d3fc55d1255 (diff)
downloadmongo-36df0e5da71c5d256d8638e522866059f73b616b.tar.gz
SERVER-18035 clean up collection and database cloners to not hold mutex when reporting completion status
-rw-r--r--src/mongo/db/repl/base_cloner.h10
-rw-r--r--src/mongo/db/repl/base_cloner_test_fixture.cpp13
-rw-r--r--src/mongo/db/repl/base_cloner_test_fixture.h17
-rw-r--r--src/mongo/db/repl/collection_cloner.cpp102
-rw-r--r--src/mongo/db/repl/collection_cloner.h42
-rw-r--r--src/mongo/db/repl/collection_cloner_test.cpp18
-rw-r--r--src/mongo/db/repl/database_cloner.cpp100
-rw-r--r--src/mongo/db/repl/database_cloner.h39
-rw-r--r--src/mongo/db/repl/database_cloner_test.cpp35
-rw-r--r--src/mongo/db/repl/database_task_test.cpp21
10 files changed, 202 insertions, 195 deletions
diff --git a/src/mongo/db/repl/base_cloner.h b/src/mongo/db/repl/base_cloner.h
index 41272004758..8d6b8be8928 100644
--- a/src/mongo/db/repl/base_cloner.h
+++ b/src/mongo/db/repl/base_cloner.h
@@ -72,17 +72,9 @@ namespace repl {
*/
virtual void cancel() = 0;
- //
- // Testing only functions below.
- //
-
/**
* Waits for active remote commands and database worker to complete.
- * Returns immediately if collection cloner is not active.
- *
- * TODO: Internal state not sufficiently protected for production use.
- *
- * For testing only.
+ * Returns immediately if cloner is not active.
*/
virtual void wait() = 0;
diff --git a/src/mongo/db/repl/base_cloner_test_fixture.cpp b/src/mongo/db/repl/base_cloner_test_fixture.cpp
index 75bc0b7b886..92518498300 100644
--- a/src/mongo/db/repl/base_cloner_test_fixture.cpp
+++ b/src/mongo/db/repl/base_cloner_test_fixture.cpp
@@ -106,10 +106,12 @@ namespace repl {
ReplicationExecutorTest::setUp();
clear();
launchExecutorThread();
+ storageInterface.reset(new StorageInterfaceMock());
}
void BaseClonerTest::tearDown() {
ReplicationExecutorTest::tearDown();
+ storageInterface.reset();
}
void BaseClonerTest::clear() {
@@ -117,18 +119,18 @@ namespace repl {
}
void BaseClonerTest::setStatus(const Status& status) {
- boost::unique_lock<boost::mutex> lk(_mutex);
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
_status = status;
_setStatusCondition.notify_all();
}
const Status& BaseClonerTest::getStatus() const {
- boost::unique_lock<boost::mutex> lk(_mutex);
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
return _status;
}
void BaseClonerTest::waitForStatus() {
- boost::unique_lock<boost::mutex> lk(_mutex);
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
if (_status == getDetectableErrorStatus()) {
try {
_setStatusCondition.wait_for(lk, Milliseconds(1000));
@@ -256,5 +258,10 @@ namespace repl {
return insertDocumentsFn ? insertDocumentsFn(txn, nss, docs) : Status::OK();
}
+ Status StorageInterfaceMock::commitCollection(OperationContext* txn,
+ const NamespaceString& nss) {
+ return Status::OK();
+ }
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/base_cloner_test_fixture.h b/src/mongo/db/repl/base_cloner_test_fixture.h
index 2bd93ce436b..064003bbb6d 100644
--- a/src/mongo/db/repl/base_cloner_test_fixture.h
+++ b/src/mongo/db/repl/base_cloner_test_fixture.h
@@ -28,8 +28,7 @@
#pragma once
-#include <boost/thread/mutex.hpp>
-#include <boost/thread/condition_variable.hpp>
+#include <memory>
#include <vector>
#include "mongo/base/status.h"
@@ -39,6 +38,8 @@
#include "mongo/db/repl/collection_cloner.h"
#include "mongo/db/repl/network_interface_mock.h"
#include "mongo/db/repl/replication_executor_test_fixture.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/stdx/condition_variable.h"
#include "mongo/util/net/hostandport.h"
namespace mongo {
@@ -49,6 +50,7 @@ namespace mongo {
namespace repl {
class BaseCloner;
+ class StorageInterfaceMock;
class BaseClonerTest : public ReplicationExecutorTest {
public:
@@ -124,12 +126,16 @@ namespace repl {
virtual BaseCloner* getCloner() const = 0;
void testLifeCycle();
+ protected:
+
+ std::unique_ptr<StorageInterfaceMock> storageInterface;
+
private:
// Protects member data of this base cloner fixture.
- mutable boost::mutex _mutex;
+ mutable stdx::mutex _mutex;
- boost::condition_variable _setStatusCondition;
+ stdx::condition_variable _setStatusCondition;
Status _status;
@@ -146,6 +152,9 @@ namespace repl {
const NamespaceString& nss,
const std::vector<BSONObj>& docs) override;
+ Status commitCollection(OperationContext* txn,
+ const NamespaceString& nss) override;
+
stdx::function<Status (OperationContext*,
const NamespaceString&,
const CollectionOptions&,
diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp
index 18d8dab6cbf..7d58ae3f2e8 100644
--- a/src/mongo/db/repl/collection_cloner.cpp
+++ b/src/mongo/db/repl/collection_cloner.cpp
@@ -32,8 +32,6 @@
#include "mongo/db/repl/collection_cloner.h"
-#include <boost/thread/lock_guard.hpp>
-
#include "mongo/util/assert_util.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
@@ -45,14 +43,14 @@ namespace repl {
const HostAndPort& source,
const NamespaceString& sourceNss,
const CollectionOptions& options,
- const CallbackFn& work,
+ const CallbackFn& onCompletion,
StorageInterface* storageInterface)
: _executor(executor),
_source(source),
_sourceNss(sourceNss),
_destNss(_sourceNss),
_options(options),
- _work(work),
+ _onCompletion(onCompletion),
_storageInterface(storageInterface),
_active(false),
_listIndexesFetcher(_executor,
@@ -77,25 +75,31 @@ namespace repl {
_indexSpecs(),
_documents(),
_dbWorkCallbackHandle(),
- // TODO: replace with executor database worker when it is available.
- _scheduleDbWorkFn(stdx::bind(&ReplicationExecutor::scheduleWorkWithGlobalExclusiveLock,
- _executor,
- stdx::placeholders::_1)) {
+ _scheduleDbWorkFn([this](const ReplicationExecutor::CallbackFn& work) {
+ return _executor->scheduleDBWork(work);
+ }) {
uassert(ErrorCodes::BadValue, "null replication executor", executor);
uassert(ErrorCodes::BadValue, "invalid collection namespace: " + sourceNss.ns(),
sourceNss.isValid());
uassertStatusOK(options.validate());
- uassert(ErrorCodes::BadValue, "callback function cannot be null", work);
+ uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion);
uassert(ErrorCodes::BadValue, "null storage interface", storageInterface);
}
+ CollectionCloner::~CollectionCloner() {
+ DESTRUCTOR_GUARD(
+ cancel();
+ wait();
+ );
+ }
+
const NamespaceString& CollectionCloner::getSourceNamespace() const {
return _sourceNss;
}
std::string CollectionCloner::getDiagnosticString() const {
- boost::lock_guard<boost::mutex> lk(_mutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
str::stream output;
output << "CollectionCloner";
output << " executor: " << _executor->getDiagnosticString();
@@ -112,12 +116,12 @@ namespace repl {
}
bool CollectionCloner::isActive() const {
- boost::lock_guard<boost::mutex> lk(_mutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
return _active;
}
Status CollectionCloner::start() {
- boost::lock_guard<boost::mutex> lk(_mutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
if (_active) {
return Status(ErrorCodes::IllegalOperation, "collection cloner already started");
@@ -136,7 +140,7 @@ namespace repl {
void CollectionCloner::cancel() {
ReplicationExecutor::CallbackHandle dbWorkCallbackHandle;
{
- boost::lock_guard<boost::mutex> lk(_mutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
if (!_active) {
return;
@@ -154,16 +158,14 @@ namespace repl {
}
void CollectionCloner::wait() {
- // If a fetcher is inactive, wait() has no effect.
- _listIndexesFetcher.wait();
- _findFetcher.wait();
- waitForDbWorker();
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _condition.wait(lk, [this]() { return !_active; });
}
void CollectionCloner::waitForDbWorker() {
ReplicationExecutor::CallbackHandle dbWorkCallbackHandle;
{
- boost::lock_guard<boost::mutex> lk(_mutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
if (!_active) {
return;
@@ -178,7 +180,7 @@ namespace repl {
}
void CollectionCloner::setScheduleDbWorkFn(const ScheduleDbWorkFn& scheduleDbWorkFn) {
- boost::lock_guard<boost::mutex> lk(_mutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
_scheduleDbWorkFn = scheduleDbWorkFn;
}
@@ -186,12 +188,8 @@ namespace repl {
void CollectionCloner::_listIndexesCallback(const StatusWith<Fetcher::BatchData>& fetchResult,
Fetcher::NextAction* nextAction,
BSONObjBuilder* getMoreBob) {
- boost::lock_guard<boost::mutex> lk(_mutex);
-
- _active = false;
-
if (!fetchResult.isOK()) {
- _work(fetchResult.getStatus());
+ _finishCallback(nullptr, fetchResult.getStatus());
return;
}
@@ -212,8 +210,6 @@ namespace repl {
invariant(getMoreBob);
getMoreBob->append("getMore", batchData.cursorId);
getMoreBob->append("collection", batchData.nss.coll());
-
- _active = true;
return;
}
@@ -221,23 +217,18 @@ namespace repl {
auto&& scheduleResult = _scheduleDbWorkFn(
stdx::bind(&CollectionCloner::_beginCollectionCallback, this, stdx::placeholders::_1));
if (!scheduleResult.isOK()) {
- _work(scheduleResult.getStatus());
+ _finishCallback(nullptr, scheduleResult.getStatus());
return;
}
- _active = true;
_dbWorkCallbackHandle = scheduleResult.getValue();
}
void CollectionCloner::_findCallback(const StatusWith<Fetcher::BatchData>& fetchResult,
Fetcher::NextAction* nextAction,
BSONObjBuilder* getMoreBob) {
- boost::lock_guard<boost::mutex> lk(_mutex);
-
- _active = false;
-
if (!fetchResult.isOK()) {
- _work(fetchResult.getStatus());
+ _finishCallback(nullptr, fetchResult.getStatus());
return;
}
@@ -248,7 +239,7 @@ namespace repl {
auto&& scheduleResult = _scheduleDbWorkFn(stdx::bind(
&CollectionCloner::_insertDocumentsCallback, this, stdx::placeholders::_1, lastBatch));
if (!scheduleResult.isOK()) {
- _work(scheduleResult.getStatus());
+ _finishCallback(nullptr, scheduleResult.getStatus());
return;
}
@@ -258,59 +249,62 @@ namespace repl {
getMoreBob->append("collection", batchData.nss.coll());
}
- _active = true;
_dbWorkCallbackHandle = scheduleResult.getValue();
}
void CollectionCloner::_beginCollectionCallback(const ReplicationExecutor::CallbackData& cbd) {
- boost::lock_guard<boost::mutex> lk(_mutex);
-
- _active = false;
-
+ OperationContext* txn = cbd.txn;
if (!cbd.status.isOK()) {
- _work(cbd.status);
+ _finishCallback(txn, cbd.status);
return;
}
- OperationContext* txn = cbd.txn;
Status status = _storageInterface->beginCollection(txn, _destNss, _options, _indexSpecs);
if (!status.isOK()) {
- _work(status);
+ _finishCallback(txn, status);
return;
}
Status scheduleStatus = _findFetcher.schedule();
if (!scheduleStatus.isOK()) {
- _work(scheduleStatus);
+ _finishCallback(txn, scheduleStatus);
return;
}
-
- _active = true;
}
void CollectionCloner::_insertDocumentsCallback(const ReplicationExecutor::CallbackData& cbd,
bool lastBatch) {
- boost::lock_guard<boost::mutex> lk(_mutex);
-
- _active = false;
-
+ OperationContext* txn = cbd.txn;
if (!cbd.status.isOK()) {
- _work(cbd.status);
+ _finishCallback(txn, cbd.status);
return;
}
- Status status = _storageInterface->insertDocuments(cbd.txn, _destNss, _documents);
+ Status status = _storageInterface->insertDocuments(txn, _destNss, _documents);
if (!status.isOK()) {
- _work(status);
+ _finishCallback(txn, status);
return;
}
if (!lastBatch) {
- _active = true;
return;
}
- _work(Status::OK());
+ _finishCallback(txn, Status::OK());
+ }
+
+ void CollectionCloner::_finishCallback(OperationContext* txn, const Status& status) {
+ if (status.isOK()) {
+ auto status = _storageInterface->commitCollection(txn, _destNss);
+ if (!status.isOK()) {
+ warning() << "Failed to commit changes to collection " << _destNss.ns()
+ << ": " << status;
+ }
+ }
+ _onCompletion(status);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _active = false;
+ _condition.notify_all();
}
} // namespace repl
diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h
index e31d8d1f075..f4f9d9867ec 100644
--- a/src/mongo/db/repl/collection_cloner.h
+++ b/src/mongo/db/repl/collection_cloner.h
@@ -28,7 +28,6 @@
#pragma once
-#include <boost/thread/mutex.hpp>
#include <memory>
#include <string>
#include <vector>
@@ -42,6 +41,8 @@
#include "mongo/db/repl/fetcher.h"
#include "mongo/db/repl/replication_executor.h"
#include "mongo/stdx/functional.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/stdx/mutex.h"
#include "mongo/util/net/hostandport.h"
namespace mongo {
@@ -71,9 +72,9 @@ namespace repl {
/**
* Creates CollectionCloner task in inactive state. Use start() to activate cloner.
*
- * The cloner calls 'work' when the collection cloning has completed or failed.
+ * The cloner calls 'onCompletion' when the collection cloning has completed or failed.
*
- * 'work' will be called exactly once.
+ * 'onCompletion' will be called exactly once.
*
* Takes ownership of the passed StorageInterface object.
*/
@@ -81,10 +82,10 @@ namespace repl {
const HostAndPort& source,
const NamespaceString& sourceNss,
const CollectionOptions& options,
- const CallbackFn& work,
+ const CallbackFn& onCompletion,
StorageInterface* storageInterface);
- virtual ~CollectionCloner() = default;
+ virtual ~CollectionCloner();
const NamespaceString& getSourceNamespace() const;
@@ -96,12 +97,12 @@ namespace repl {
void cancel() override;
+ void wait() override;
+
//
// Testing only functions below.
//
- void wait() override;
-
/**
* Waits for database worker to complete.
* Returns immediately if collection cloner is not active.
@@ -154,6 +155,13 @@ namespace repl {
void _insertDocumentsCallback(const ReplicationExecutor::CallbackData& callbackData,
bool lastBatch);
+ /**
+ * Reports completion status.
+ * Commits/aborts collection building.
+ * Sets cloner to inactive.
+ */
+ void _finishCallback(OperationContext* txn, const Status& status);
+
// Not owned by us.
ReplicationExecutor* _executor;
@@ -163,13 +171,15 @@ namespace repl {
CollectionOptions _options;
// Invoked once when cloning completes or fails.
- CallbackFn _work;
+ CallbackFn _onCompletion;
- // Owned by us.
- std::unique_ptr<StorageInterface> _storageInterface;
+ // Not owned by us.
+ StorageInterface* _storageInterface;
// Protects member data of this collection cloner.
- mutable boost::mutex _mutex;
+ mutable stdx::mutex _mutex;
+
+ mutable stdx::condition_variable _condition;
// _active is true when Collection Cloner is started.
bool _active;
@@ -204,9 +214,6 @@ namespace repl {
class CollectionCloner::StorageInterface {
public:
- /**
- * When the storage interface is destroyed, it will commit the index builder.
- */
virtual ~StorageInterface() = default;
/**
@@ -230,6 +237,13 @@ namespace repl {
const NamespaceString& nss,
const std::vector<BSONObj>& documents) = 0;
+ /**
+ * Commits changes to collection. No effect if collection building has not begun.
+ * Operation context could be null.
+ */
+ virtual Status commitCollection(OperationContext* txn,
+ const NamespaceString& nss) = 0;
+
};
} // namespace repl
diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp
index 58b23ab226d..a6841ad2467 100644
--- a/src/mongo/db/repl/collection_cloner_test.cpp
+++ b/src/mongo/db/repl/collection_cloner_test.cpp
@@ -45,7 +45,6 @@ namespace {
class CollectionClonerTest : public BaseClonerTest {
public:
- CollectionClonerTest();
void setUp() override;
void tearDown() override;
@@ -53,25 +52,18 @@ namespace {
protected:
CollectionOptions options;
- StorageInterfaceMock* storageInterface;
std::unique_ptr<CollectionCloner> collectionCloner;
};
- CollectionClonerTest::CollectionClonerTest()
- : options(),
- storageInterface(nullptr),
- collectionCloner() { }
-
void CollectionClonerTest::setUp() {
BaseClonerTest::setUp();
options.reset();
options.storageEngine = BSON("storageEngine1" << BSONObj());
- storageInterface = new StorageInterfaceMock();
collectionCloner.reset(new CollectionCloner(&getExecutor(), target, nss, options,
stdx::bind(&CollectionClonerTest::setStatus,
this,
stdx::placeholders::_1),
- storageInterface));
+ storageInterface.get()));
}
void CollectionClonerTest::tearDown() {
@@ -92,7 +84,7 @@ namespace {
// Null executor.
{
- CollectionCloner::StorageInterface* si = new StorageInterfaceMock();
+ CollectionCloner::StorageInterface* si = storageInterface.get();
ASSERT_THROWS(CollectionCloner(nullptr, target, nss, options, cb, si), UserException);
}
@@ -103,7 +95,7 @@ namespace {
// Invalid namespace.
{
NamespaceString badNss("db.");
- CollectionCloner::StorageInterface* si = new StorageInterfaceMock();
+ CollectionCloner::StorageInterface* si = storageInterface.get();
ASSERT_THROWS(CollectionCloner(&executor, target, badNss, options, cb, si),
UserException);
}
@@ -112,7 +104,7 @@ namespace {
{
CollectionOptions invalidOptions;
invalidOptions.storageEngine = BSON("storageEngine1" << "not a document");
- CollectionCloner::StorageInterface* si = new StorageInterfaceMock();
+ CollectionCloner::StorageInterface* si = storageInterface.get();
ASSERT_THROWS(CollectionCloner(&executor, target, nss, invalidOptions, cb, si),
UserException);
}
@@ -120,7 +112,7 @@ namespace {
// Callback function cannot be null.
{
CollectionCloner::CallbackFn nullCb;
- CollectionCloner::StorageInterface* si = new StorageInterfaceMock();
+ CollectionCloner::StorageInterface* si = storageInterface.get();
ASSERT_THROWS(CollectionCloner(&executor, target, nss, options, nullCb, si),
UserException);
}
diff --git a/src/mongo/db/repl/database_cloner.cpp b/src/mongo/db/repl/database_cloner.cpp
index 466944c02f9..dc0f6f8990a 100644
--- a/src/mongo/db/repl/database_cloner.cpp
+++ b/src/mongo/db/repl/database_cloner.cpp
@@ -32,7 +32,6 @@
#include "mongo/db/repl/database_cloner.h"
-#include <boost/thread/lock_guard.hpp>
#include <algorithm>
#include <iterator>
#include <set>
@@ -77,17 +76,17 @@ namespace {
const std::string& dbname,
const BSONObj& listCollectionsFilter,
const ListCollectionsPredicateFn& listCollectionsPred,
- const CreateStorageInterfaceFn& csi,
+ CollectionCloner::StorageInterface* si,
const CollectionCallbackFn& collWork,
- const CallbackFn& work)
+ const CallbackFn& onCompletion)
: _executor(executor),
_source(source),
_dbname(dbname),
_listCollectionsFilter(listCollectionsFilter),
_listCollectionsPredicate(listCollectionsPred ? listCollectionsPred : acceptAllPred),
- _createStorageInterface(csi),
+ _storageInterface(si),
_collectionWork(collWork),
- _work(work),
+ _onCompletion(onCompletion),
_active(false),
_listCollectionsFetcher(_executor,
_source,
@@ -98,26 +97,32 @@ namespace {
stdx::placeholders::_1,
stdx::placeholders::_2,
stdx::placeholders::_3)),
- // TODO: replace with executor database worker when it is available.
- _scheduleDbWorkFn(stdx::bind(&ReplicationExecutor::scheduleWorkWithGlobalExclusiveLock,
- _executor,
- stdx::placeholders::_1)),
+ _scheduleDbWorkFn([this](const ReplicationExecutor::CallbackFn& work) {
+ return _executor->scheduleDBWork(work);
+ }),
_startCollectionCloner([](CollectionCloner& cloner) { return cloner.start(); }) {
uassert(ErrorCodes::BadValue, "null replication executor", executor);
uassert(ErrorCodes::BadValue, "empty database name", !dbname.empty());
- uassert(ErrorCodes::BadValue, "storage interface creation function cannot be null", csi);
+ uassert(ErrorCodes::BadValue, "storage interface cannot be null", si);
uassert(ErrorCodes::BadValue, "collection callback function cannot be null", collWork);
- uassert(ErrorCodes::BadValue, "callback function cannot be null", work);
+ uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion);
+ }
+
+ DatabaseCloner::~DatabaseCloner() {
+ DESTRUCTOR_GUARD(
+ cancel();
+ wait();
+ );
}
const std::vector<BSONObj>& DatabaseCloner::getCollectionInfos() const {
- boost::lock_guard<boost::mutex> lk(_mutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
return _collectionInfos;
}
std::string DatabaseCloner::getDiagnosticString() const {
- boost::lock_guard<boost::mutex> lk(_mutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
str::stream output;
output << "DatabaseCloner";
output << " executor: " << _executor->getDiagnosticString();
@@ -131,12 +136,12 @@ namespace {
}
bool DatabaseCloner::isActive() const {
- boost::lock_guard<boost::mutex> lk(_mutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
return _active;
}
Status DatabaseCloner::start() {
- boost::lock_guard<boost::mutex> lk(_mutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
if (_active) {
return Status(ErrorCodes::IllegalOperation, "database cloner already started");
@@ -154,7 +159,7 @@ namespace {
void DatabaseCloner::cancel() {
{
- boost::lock_guard<boost::mutex> lk(_mutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
if (!_active) {
return;
@@ -165,10 +170,12 @@ namespace {
}
void DatabaseCloner::wait() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _condition.wait(lk, [this]() { return !_active; });
}
void DatabaseCloner::setScheduleDbWorkFn(const CollectionCloner::ScheduleDbWorkFn& work) {
- boost::lock_guard<boost::mutex> lk(_mutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
_scheduleDbWorkFn = work;
}
@@ -183,12 +190,8 @@ namespace {
Fetcher::NextAction* nextAction,
BSONObjBuilder* getMoreBob) {
- boost::lock_guard<boost::mutex> lk(_mutex);
-
- _active = false;
-
if (!result.isOK()) {
- _work(result.getStatus());
+ _finishCallback(result.getStatus());
return;
}
@@ -206,14 +209,12 @@ namespace {
invariant(getMoreBob);
getMoreBob->append("getMore", batchData.cursorId);
getMoreBob->append("collection", batchData.nss.coll());
-
- _active = true;
return;
}
// Nothing to do for an empty database.
if (_collectionInfos.empty()) {
- _work(Status::OK());
+ _finishCallback(Status::OK());
return;
}
@@ -222,41 +223,42 @@ namespace {
for (auto&& info : _collectionInfos) {
BSONElement nameElement = info.getField(kNameFieldName);
if (nameElement.eoo()) {
- _work(Status(ErrorCodes::FailedToParse, str::stream() <<
+ _finishCallback(Status(ErrorCodes::FailedToParse, str::stream() <<
"collection info must contain '" << kNameFieldName << "' " <<
"field : " << info));
return;
}
if (nameElement.type() != mongo::String) {
- _work(Status(ErrorCodes::TypeMismatch, str::stream() <<
- "'" << kNameFieldName << "' field must be a string: " << info));
+ _finishCallback(Status(ErrorCodes::TypeMismatch, str::stream() <<
+ "'" << kNameFieldName << "' field must be a string: " << info));
return;
}
const std::string collectionName = nameElement.String();
if (seen.find(collectionName) != seen.end()) {
- _work(Status(ErrorCodes::DuplicateKey, str::stream() <<
- "collection info contains duplicate collection name " <<
- "'" << collectionName << "': " << info));
+ _finishCallback(Status(ErrorCodes::DuplicateKey, str::stream() <<
+ "collection info contains duplicate collection name " <<
+ "'" << collectionName << "': " << info));
return;
}
BSONElement optionsElement = info.getField(kOptionsFieldName);
if (optionsElement.eoo()) {
- _work(Status(ErrorCodes::FailedToParse, str::stream() <<
- "collection info must contain '" << kOptionsFieldName << "' " <<
- "field : " << info));
+ _finishCallback(Status(ErrorCodes::FailedToParse, str::stream() <<
+ "collection info must contain '" << kOptionsFieldName << "' " <<
+ "field : " << info));
return;
}
if (!optionsElement.isABSONObj()) {
- _work(Status(ErrorCodes::TypeMismatch, str::stream() <<
- "'" << kOptionsFieldName << "' field must be an object: " << info));
+ _finishCallback(Status(ErrorCodes::TypeMismatch, str::stream() <<
+ "'" << kOptionsFieldName << "' field must be an object: " <<
+ info));
return;
}
const BSONObj optionsObj = optionsElement.Obj();
CollectionOptions options;
Status parseStatus = options.parse(optionsObj);
if (!parseStatus.isOK()) {
- _work(parseStatus);
+ _finishCallback(parseStatus);
return;
}
seen.insert(collectionName);
@@ -274,10 +276,10 @@ namespace {
this,
stdx::placeholders::_1,
nss),
- _createStorageInterface());
+ _storageInterface);
}
catch (const UserException& ex) {
- _work(ex.toStatus());
+ _finishCallback(ex.toStatus());
return;
}
}
@@ -296,19 +298,13 @@ namespace {
LOG(1) << " failed to start collection cloning on "
<< _currentCollectionClonerIter->getSourceNamespace()
<< ": " << startStatus;
- _work(startStatus);
+ _finishCallback(startStatus);
return;
}
-
- _active = true;
}
void DatabaseCloner::_collectionClonerCallback(const Status& status,
const NamespaceString& nss) {
- boost::lock_guard<boost::mutex> lk(_mutex);
-
- _active = false;
-
// Forward collection cloner result to caller.
// Failure to clone a collection does not stop the database cloner
// from cloning the rest of the collections in the listCollections result.
@@ -324,14 +320,20 @@ namespace {
LOG(1) << " failed to start collection cloning on "
<< _currentCollectionClonerIter->getSourceNamespace()
<< ": " << startStatus;
- _work(startStatus);
+ _finishCallback(startStatus);
return;
}
- _active = true;
return;
}
- _work(Status::OK());
+ _finishCallback(Status::OK());
+ }
+
+ void DatabaseCloner::_finishCallback(const Status& status) {
+ _onCompletion(status);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _active = false;
+ _condition.notify_all();
}
} // namespace repl
diff --git a/src/mongo/db/repl/database_cloner.h b/src/mongo/db/repl/database_cloner.h
index 7660f8a1995..447d7ce07fd 100644
--- a/src/mongo/db/repl/database_cloner.h
+++ b/src/mongo/db/repl/database_cloner.h
@@ -28,7 +28,6 @@
#pragma once
-#include <boost/thread/mutex.hpp>
#include <list>
#include <string>
#include <vector>
@@ -41,6 +40,8 @@
#include "mongo/db/repl/base_cloner.h"
#include "mongo/db/repl/fetcher.h"
#include "mongo/db/repl/replication_executor.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/stdx/mutex.h"
#include "mongo/util/net/hostandport.h"
namespace mongo {
@@ -64,14 +65,8 @@ namespace repl {
using ListCollectionsPredicateFn = stdx::function<bool (const BSONObj&)>;
/**
- * Type of function to create a storage interface instance for
- * the collection cloner.
- */
- using CreateStorageInterfaceFn = stdx::function<CollectionCloner::StorageInterface* ()>;
-
- /**
* Callback function to report progress of collection cloning. Arguments are:
- * - status from the collection cloner's 'work' callback.
+ * - status from the collection cloner's 'onCompletion' callback.
* - source namespace of the collection cloner that completed (or failed).
*
* Called exactly once for every collection cloner started by the the database cloner.
@@ -86,9 +81,9 @@ namespace repl {
/**
* Creates DatabaseCloner task in inactive state. Use start() to activate cloner.
*
- * The cloner calls 'work' when the database cloning has completed or failed.
+ * The cloner calls 'onCompletion' when the database cloning has completed or failed.
*
- * 'work' will be called exactly once.
+ * 'onCompletion' will be called exactly once.
*
* Takes ownership of the passed StorageInterface object.
*/
@@ -97,11 +92,11 @@ namespace repl {
const std::string& dbname,
const BSONObj& listCollectionsFilter,
const ListCollectionsPredicateFn& listCollectionsPredicate,
- const CreateStorageInterfaceFn& createStorageInterface,
+ CollectionCloner::StorageInterface* storageInterface,
const CollectionCallbackFn& collectionWork,
- const CallbackFn& work);
+ const CallbackFn& onCompletion);
- virtual ~DatabaseCloner() = default;
+ virtual ~DatabaseCloner();
/**
* Returns collection info objects read from listCollections result.
@@ -118,12 +113,12 @@ namespace repl {
void cancel() override;
+ void wait() override;
+
//
// Testing only functions below.
//
- void wait() override;
-
/**
* Overrides how executor schedules database work.
*
@@ -153,6 +148,12 @@ namespace repl {
*/
void _collectionClonerCallback(const Status& status, const NamespaceString& nss);
+ /**
+ * Reports completion status.
+ * Sets cloner to inactive.
+ */
+ void _finishCallback(const Status& status);
+
// Not owned by us.
ReplicationExecutor* _executor;
@@ -160,16 +161,18 @@ namespace repl {
std::string _dbname;
BSONObj _listCollectionsFilter;
ListCollectionsPredicateFn _listCollectionsPredicate;
- CreateStorageInterfaceFn _createStorageInterface;
+ CollectionCloner::StorageInterface* _storageInterface;
// Invoked once for every successfully started collection cloner.
CollectionCallbackFn _collectionWork;
// Invoked once when cloning completes or fails.
- CallbackFn _work;
+ CallbackFn _onCompletion;
// Protects member data of this database cloner.
- mutable boost::mutex _mutex;
+ mutable stdx::mutex _mutex;
+
+ mutable stdx::condition_variable _condition;
// _active is true when database cloner is started.
bool _active;
diff --git a/src/mongo/db/repl/database_cloner_test.cpp b/src/mongo/db/repl/database_cloner_test.cpp
index ef1165bf6f1..22e0e452f3d 100644
--- a/src/mongo/db/repl/database_cloner_test.cpp
+++ b/src/mongo/db/repl/database_cloner_test.cpp
@@ -45,10 +45,6 @@ namespace {
const std::string dbname("db");
- CollectionCloner::StorageInterface* createStorageInterface() {
- return new StorageInterfaceMock();
- }
-
class DatabaseClonerTest : public BaseClonerTest {
public:
DatabaseClonerTest();
@@ -79,7 +75,7 @@ namespace {
dbname,
BSONObj(),
DatabaseCloner::ListCollectionsPredicateFn(),
- createStorageInterface,
+ storageInterface.get(),
stdx::bind(&DatabaseClonerTest::collectionWork,
this,
stdx::placeholders::_1,
@@ -107,7 +103,7 @@ namespace {
const BSONObj filter;
DatabaseCloner::ListCollectionsPredicateFn pred;
- const DatabaseCloner::CreateStorageInterfaceFn& csi = createStorageInterface;
+ CollectionCloner::StorageInterface* si = storageInterface.get();
namespace stdxph = stdx::placeholders;
const DatabaseCloner::CollectionCallbackFn ccb =
stdx::bind(&DatabaseClonerTest::collectionWork, this, stdxph::_1, stdxph::_2);
@@ -115,31 +111,31 @@ namespace {
const auto& cb = [](const Status&) { FAIL("should not reach here"); };
// Null executor.
- ASSERT_THROWS(DatabaseCloner(nullptr, target, dbname, filter, pred, csi, ccb, cb),
+ ASSERT_THROWS(DatabaseCloner(nullptr, target, dbname, filter, pred, si, ccb, cb),
UserException);
// Empty database name
- ASSERT_THROWS(DatabaseCloner(&executor, target, "", filter, pred, csi, ccb, cb),
+ ASSERT_THROWS(DatabaseCloner(&executor, target, "", filter, pred, si, ccb, cb),
UserException);
// Callback function cannot be null.
{
DatabaseCloner::CallbackFn ncb;
- ASSERT_THROWS(DatabaseCloner(&executor, target, dbname, filter, pred, csi, ccb, ncb),
+ ASSERT_THROWS(DatabaseCloner(&executor, target, dbname, filter, pred, si, ccb, ncb),
UserException);
}
- // CreateStorageInterfaceFn function cannot be null.
+ // Storage interface cannot be null.
{
- DatabaseCloner::CreateStorageInterfaceFn ncsi;
- ASSERT_THROWS(DatabaseCloner(&executor, target, dbname, filter, pred, ncsi, ccb, cb),
+ CollectionCloner::StorageInterface* nsi = nullptr;
+ ASSERT_THROWS(DatabaseCloner(&executor, target, dbname, filter, pred, nsi, ccb, cb),
UserException);
}
// CollectionCallbackFn function cannot be null.
{
DatabaseCloner::CollectionCallbackFn nccb;
- ASSERT_THROWS(DatabaseCloner(&executor, target, dbname, filter, pred, csi, nccb, cb),
+ ASSERT_THROWS(DatabaseCloner(&executor, target, dbname, filter, pred, si, nccb, cb),
UserException);
}
}
@@ -170,7 +166,7 @@ namespace {
dbname,
listCollectionsFilter,
DatabaseCloner::ListCollectionsPredicateFn(),
- createStorageInterface,
+ storageInterface.get(),
stdx::bind(&DatabaseClonerTest::collectionWork,
this,
stdx::placeholders::_1,
@@ -230,7 +226,7 @@ namespace {
dbname,
BSONObj(),
pred,
- createStorageInterface,
+ storageInterface.get(),
stdx::bind(&DatabaseClonerTest::collectionWork,
this,
stdx::placeholders::_1,
@@ -352,14 +348,13 @@ namespace {
ASSERT_FALSE(databaseCloner->isActive());
}
- TEST_F(DatabaseClonerTest, InvalidStorageInterface) {
- auto invalidCreateStorageInterface = []() { return nullptr; };
+ TEST_F(DatabaseClonerTest, ListCollectionsReturnsEmptyCollectionName) {
databaseCloner.reset(new DatabaseCloner(&getExecutor(),
target,
dbname,
BSONObj(),
DatabaseCloner::ListCollectionsPredicateFn(),
- invalidCreateStorageInterface,
+ storageInterface.get(),
stdx::bind(&DatabaseClonerTest::collectionWork,
this,
stdx::placeholders::_1,
@@ -370,10 +365,10 @@ namespace {
ASSERT_OK(databaseCloner->start());
processNetworkResponse(createListCollectionsResponse(0, BSON_ARRAY(
- BSON("name" << "a" << "options" << BSONObj()))));
+ BSON("name" << "" << "options" << BSONObj()))));
ASSERT_EQUALS(ErrorCodes::BadValue, getStatus().code());
- ASSERT_STRING_CONTAINS(getStatus().reason(), "null storage interface");
+ ASSERT_STRING_CONTAINS(getStatus().reason(), "invalid collection namespace: db.");
ASSERT_FALSE(databaseCloner->isActive());
}
diff --git a/src/mongo/db/repl/database_task_test.cpp b/src/mongo/db/repl/database_task_test.cpp
index dd8c17ef3e6..13cf1f79e71 100644
--- a/src/mongo/db/repl/database_task_test.cpp
+++ b/src/mongo/db/repl/database_task_test.cpp
@@ -28,12 +28,11 @@
#include "mongo/platform/basic.h"
-#include <boost/thread/lock_types.hpp>
-
#include "mongo/db/repl/database_task.h"
#include "mongo/db/repl/operation_context_repl_mock.h"
#include "mongo/db/repl/task_runner.h"
#include "mongo/db/repl/task_runner_test_fixture.h"
+#include "mongo/stdx/mutex.h"
#include "mongo/util/concurrency/thread_pool.h"
namespace {
@@ -71,14 +70,14 @@ namespace {
}
TEST_F(DatabaseTaskTest, RunGlobalExclusiveLockTask) {
- boost::mutex mutex;
+ stdx::mutex mutex;
bool called = false;
OperationContext* txn = nullptr;
bool lockIsW = false;
Status status = getDetectableErrorStatus();
// Task returning 'void' implies NextAction::NoAction.
auto task = [&](OperationContext* theTxn, const Status& theStatus) {
- boost::lock_guard<boost::mutex> lk(mutex);
+ stdx::lock_guard<stdx::mutex> lk(mutex);
called = true;
txn = theTxn;
lockIsW = txn->lockState()->isW();
@@ -89,7 +88,7 @@ namespace {
getThreadPool().join();
ASSERT_FALSE(getTaskRunner().isActive());
- boost::lock_guard<boost::mutex> lk(mutex);
+ stdx::lock_guard<stdx::mutex> lk(mutex);
ASSERT_TRUE(called);
ASSERT(txn);
ASSERT_TRUE(lockIsW);
@@ -97,14 +96,14 @@ namespace {
}
void _testRunDatabaseLockTask(DatabaseTaskTest& test, LockMode mode) {
- boost::mutex mutex;
+ stdx::mutex mutex;
bool called = false;
OperationContext* txn = nullptr;
bool isDatabaseLockedForMode = false;
Status status = test.getDetectableErrorStatus();
// Task returning 'void' implies NextAction::NoAction.
auto task = [&](OperationContext* theTxn, const Status& theStatus) {
- boost::lock_guard<boost::mutex> lk(mutex);
+ stdx::lock_guard<stdx::mutex> lk(mutex);
called = true;
txn = theTxn;
isDatabaseLockedForMode = txn->lockState()->isDbLockedForMode(databaseName, mode);
@@ -116,7 +115,7 @@ namespace {
test.getThreadPool().join();
ASSERT_FALSE(test.getTaskRunner().isActive());
- boost::lock_guard<boost::mutex> lk(mutex);
+ stdx::lock_guard<stdx::mutex> lk(mutex);
ASSERT_TRUE(called);
ASSERT(txn);
ASSERT_TRUE(isDatabaseLockedForMode);
@@ -140,14 +139,14 @@ namespace {
}
void _testRunCollectionLockTask(DatabaseTaskTest& test, LockMode mode) {
- boost::mutex mutex;
+ stdx::mutex mutex;
bool called = false;
OperationContext* txn = nullptr;
bool isCollectionLockedForMode = false;
Status status = test.getDetectableErrorStatus();
// Task returning 'void' implies NextAction::NoAction.
auto task = [&](OperationContext* theTxn, const Status& theStatus) {
- boost::lock_guard<boost::mutex> lk(mutex);
+ stdx::lock_guard<stdx::mutex> lk(mutex);
called = true;
txn = theTxn;
isCollectionLockedForMode =
@@ -160,7 +159,7 @@ namespace {
test.getThreadPool().join();
ASSERT_FALSE(test.getTaskRunner().isActive());
- boost::lock_guard<boost::mutex> lk(mutex);
+ stdx::lock_guard<stdx::mutex> lk(mutex);
ASSERT_TRUE(called);
ASSERT(txn);
ASSERT_TRUE(isCollectionLockedForMode);