summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2016-11-28 21:30:41 -0500
committerBenety Goh <benety@mongodb.com>2016-11-28 21:30:41 -0500
commit19a7178e9b457d75b0431f384b85d69d7309a563 (patch)
treef70c0cc8acc09c89a39d651068fceb1d4af89874
parent06da357873b3500f39832dee914c06b1968d05ca (diff)
downloadmongo-19a7178e9b457d75b0431f384b85d69d7309a563.tar.gz
SERVER-27052 clarified DatabasesCloner's startup and shutdown contract.
DatabasesCloner::startup() does not invoke completion callback on failing to schedule first remote command DatabasesCloner::shutdown() leaves _active state unchanged DatabasesCloner clears DatabasesCloner::_finishFn on completion to release any resources that might be held by function object make DatabasesCloner single-use only - cannot be restarted once completed
-rw-r--r--src/mongo/db/repl/data_replicator.cpp6
-rw-r--r--src/mongo/db/repl/databases_cloner.cpp82
-rw-r--r--src/mongo/db/repl/databases_cloner.h15
-rw-r--r--src/mongo/db/repl/databases_cloner_test.cpp160
4 files changed, 237 insertions, 26 deletions
diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp
index 114c07b9219..79cd4f1d4ac 100644
--- a/src/mongo/db/repl/data_replicator.cpp
+++ b/src/mongo/db/repl/data_replicator.cpp
@@ -523,7 +523,11 @@ Status DataReplicator::_runInitialSyncAttempt_inlock(OperationContext* txn,
}
}
- cloner->startup(); // When the cloner is done applier starts.
+ auto clonerStartupStatus = cloner->startup(); // When the cloner is done applier starts.
+ if (!clonerStartupStatus.isOK()) {
+ return clonerStartupStatus;
+ }
+
_exec->waitForEvent(initialSyncFinishEvent);
log() << "Initial sync attempt finishing up.";
diff --git a/src/mongo/db/repl/databases_cloner.cpp b/src/mongo/db/repl/databases_cloner.cpp
index 4a4e013de0e..9b8fcbeb568 100644
--- a/src/mongo/db/repl/databases_cloner.cpp
+++ b/src/mongo/db/repl/databases_cloner.cpp
@@ -94,7 +94,7 @@ DatabasesCloner::~DatabasesCloner() {
std::string DatabasesCloner::toString() const {
LockGuard lk(_mutex);
return str::stream() << "initial sync --"
- << " active:" << _active << " status:" << _status.toString()
+ << " active:" << _isActive_inlock() << " status:" << _status.toString()
<< " source:" << _source.toString()
<< " db cloners completed:" << _stats.databasesCloned
<< " db count:" << _databaseCloners.size();
@@ -112,6 +112,23 @@ void DatabasesCloner::join() {
}
void DatabasesCloner::shutdown() {
+ {
+ LockGuard lock(_mutex);
+ switch (_state) {
+ case State::kPreStart:
+ // Transition directly from PreStart to Complete if not started yet.
+ _state = State::kComplete;
+ return;
+ case State::kRunning:
+ _state = State::kShuttingDown;
+ break;
+ case State::kShuttingDown:
+ case State::kComplete:
+ // Nothing to do if we are already in ShuttingDown or Complete state.
+ return;
+ }
+ }
+
if (auto listDatabaseScheduler = _getListDatabasesScheduler()) {
listDatabaseScheduler->shutdown();
}
@@ -120,18 +137,15 @@ void DatabasesCloner::shutdown() {
for (auto&& cloner : databaseCloners) {
cloner->shutdown();
}
-
- LockGuard lk(_mutex);
- if (!_active) {
- return;
- }
- _active = false;
- _setStatus_inlock({ErrorCodes::CallbackCanceled, "Initial Sync Cancelled."});
}
bool DatabasesCloner::isActive() {
LockGuard lk(_mutex);
- return _active;
+ return _isActive_inlock();
+}
+
+bool DatabasesCloner::_isActive_inlock() const {
+ return State::kRunning == _state || State::kShuttingDown == _state;
}
Status DatabasesCloner::getStatus() {
@@ -167,17 +181,25 @@ void DatabasesCloner::Stats::append(BSONObjBuilder* builder) const {
}
}
-Status DatabasesCloner::startup() {
- UniqueLock lk(_mutex);
- invariant(!_active);
- _active = true;
+Status DatabasesCloner::startup() noexcept {
+ LockGuard lk(_mutex);
+
+ switch (_state) {
+ case State::kPreStart:
+ _state = State::kRunning;
+ break;
+ case State::kRunning:
+ return Status(ErrorCodes::InternalError, "databases cloner already started");
+ case State::kShuttingDown:
+ return Status(ErrorCodes::ShutdownInProgress, "databases cloner shutting down");
+ case State::kComplete:
+ return Status(ErrorCodes::ShutdownInProgress, "databases cloner completed");
+ }
if (!_status.isOK() && _status.code() != ErrorCodes::NotYetInitialized) {
return _status;
}
- _status = Status::OK();
-
// Schedule listDatabase command which will kick off the database cloner per result db.
Request listDBsReq(_source,
"admin",
@@ -192,12 +214,14 @@ Status DatabasesCloner::startup() {
numInitialSyncListDatabasesAttempts,
executor::RemoteCommandRequest::kNoTimeout,
RemoteCommandRetryScheduler::kAllRetriableErrors));
- auto s = _listDBsScheduler->startup();
- if (!s.isOK()) {
- _fail_inlock(&lk, s);
+ _status = _listDBsScheduler->startup();
+
+ if (!_status.isOK()) {
+ _state = State::kComplete;
+ return _status;
}
- return _status;
+ return Status::OK();
}
void DatabasesCloner::setScheduleDbWorkFn_forTest(const CollectionCloner::ScheduleDbWorkFn& work) {
@@ -366,34 +390,48 @@ void DatabasesCloner::_onEachDBCloneFinish(const Status& status, const std::stri
void DatabasesCloner::_fail_inlock(UniqueLock* lk, Status status) {
LOG(3) << "DatabasesCloner::_fail_inlock called";
- if (!_active) {
+ if (!_isActive_inlock()) {
return;
}
_setStatus_inlock(status);
// TODO: shutdown outstanding work, like any cloners active
+ invariant(_finishFn);
auto finish = _finishFn;
+ _finishFn = {};
lk->unlock();
LOG(3) << "DatabasesCloner - calling _finishFn with status: " << _status;
finish(status);
+ // Release any resources that might be held by the '_finishFn' (moved to 'finish') function
+ // object.
+ finish = OnFinishFn();
+
lk->lock();
- _active = false;
+ invariant(_state != State::kComplete);
+ _state = State::kComplete;
}
void DatabasesCloner::_succeed_inlock(UniqueLock* lk) {
LOG(3) << "DatabasesCloner::_succeed_inlock called";
const auto status = Status::OK();
_setStatus_inlock(status);
+ invariant(_finishFn);
auto finish = _finishFn;
+ _finishFn = OnFinishFn();
lk->unlock();
LOG(3) << "DatabasesCloner - calling _finishFn with status OK";
finish(status);
+ // Release any resources that might be held by the '_finishFn' (moved to 'finish') function
+ // object.
+ finish = OnFinishFn();
+
lk->lock();
- _active = false;
+ invariant(_state != State::kComplete);
+ _state = State::kComplete;
}
void DatabasesCloner::_setStatus_inlock(Status s) {
diff --git a/src/mongo/db/repl/databases_cloner.h b/src/mongo/db/repl/databases_cloner.h
index 99748af768f..53cced9ee4f 100644
--- a/src/mongo/db/repl/databases_cloner.h
+++ b/src/mongo/db/repl/databases_cloner.h
@@ -83,7 +83,7 @@ public:
~DatabasesCloner();
- Status startup();
+ Status startup() noexcept;
bool isActive();
void join();
void shutdown();
@@ -105,6 +105,8 @@ public:
void setScheduleDbWorkFn_forTest(const CollectionCloner::ScheduleDbWorkFn& scheduleDbWorkFn);
private:
+ bool _isActive_inlock() const;
+
/**
* Returns a copy of the database cloners.
*/
@@ -146,16 +148,23 @@ private:
executor::TaskExecutor* _exec; // (R) executor to schedule things with
OldThreadPool* _dbWorkThreadPool; // (R) db worker thread pool for collection cloning.
const HostAndPort _source; // (R) The source to use.
- bool _active = false; // (M) false until we start, and true until finished.
CollectionCloner::ScheduleDbWorkFn _scheduleDbWorkFn; // (M)
const IncludeDbFilterFn _includeDbFn; // (R) function which decides which dbs are cloned.
- const OnFinishFn _finishFn; // (R) function called when finished.
+ OnFinishFn _finishFn; // (M) function called when finished.
StorageInterface* _storage; // (R)
std::unique_ptr<RemoteCommandRetryScheduler> _listDBsScheduler; // (M) scheduler for listDBs.
std::vector<std::shared_ptr<DatabaseCloner>> _databaseCloners; // (M) database cloners by name
Stats _stats; // (M)
+
+ // State transitions:
+ // PreStart --> Running --> ShuttingDown --> Complete
+ // It is possible to skip intermediate states. For example,
+ // Calling shutdown() when the cloner has not started will transition from PreStart directly
+ // to Complete.
+ enum class State { kPreStart, kRunning, kShuttingDown, kComplete };
+ State _state = State::kPreStart;
};
diff --git a/src/mongo/db/repl/databases_cloner_test.cpp b/src/mongo/db/repl/databases_cloner_test.cpp
index 740a194e490..81552201485 100644
--- a/src/mongo/db/repl/databases_cloner_test.cpp
+++ b/src/mongo/db/repl/databases_cloner_test.cpp
@@ -46,6 +46,7 @@
#include "mongo/util/concurrency/old_thread_pool.h"
#include "mongo/util/concurrency/thread_name.h"
#include "mongo/util/mongoutils/str.h"
+#include "mongo/util/scopeguard.h"
namespace {
using namespace mongo;
@@ -388,6 +389,67 @@ TEST_F(DBsClonerTest, InvalidConstruction) {
"finishFn must be provided.");
}
+TEST_F(DBsClonerTest, StartupReturnsListDatabasesScheduleErrorButDoesNotInvokeCompletionCallback) {
+ Status result = getDetectableErrorStatus();
+ Status expectedResult{ErrorCodes::BadValue, "foo"};
+ DatabasesCloner cloner{&getStorage(),
+ &getExecutor(),
+ &getDbWorkThreadPool(),
+ HostAndPort{"local:1234"},
+ [](const BSONObj&) { return true; },
+ [&result](const Status& status) {
+ log() << "setting result to " << status;
+ result = status;
+ }};
+
+ getExecutor().shutdown();
+ ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, cloner.startup());
+ ASSERT_FALSE(cloner.isActive());
+
+ ASSERT_EQUALS(getDetectableErrorStatus(), result);
+}
+
+TEST_F(DBsClonerTest, StartupReturnsShuttingDownInProgressAfterShutdownIsCalled) {
+ Status result = getDetectableErrorStatus();
+ Status expectedResult{ErrorCodes::BadValue, "foo"};
+ DatabasesCloner cloner{&getStorage(),
+ &getExecutor(),
+ &getDbWorkThreadPool(),
+ HostAndPort{"local:1234"},
+ [](const BSONObj&) { return true; },
+ [&result](const Status& status) {
+ log() << "setting result to " << status;
+ result = status;
+ }};
+ ON_BLOCK_EXIT([this] { getExecutor().shutdown(); });
+
+ cloner.shutdown();
+ ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, cloner.startup());
+ ASSERT_FALSE(cloner.isActive());
+
+ ASSERT_EQUALS(getDetectableErrorStatus(), result);
+}
+
+TEST_F(DBsClonerTest, StartupReturnsInternalErrorAfterSuccessfulStartup) {
+ Status result = getDetectableErrorStatus();
+ Status expectedResult{ErrorCodes::BadValue, "foo"};
+ DatabasesCloner cloner{&getStorage(),
+ &getExecutor(),
+ &getDbWorkThreadPool(),
+ HostAndPort{"local:1234"},
+ [](const BSONObj&) { return true; },
+ [&result](const Status& status) {
+ log() << "setting result to " << status;
+ result = status;
+ }};
+ ON_BLOCK_EXIT([this] { getExecutor().shutdown(); });
+
+ ASSERT_OK(cloner.startup());
+
+ ASSERT_EQUALS(ErrorCodes::InternalError, cloner.startup());
+ ASSERT_TRUE(cloner.isActive());
+}
+
TEST_F(DBsClonerTest, FailsOnListDatabases) {
Status result{Status::OK()};
Status expectedResult{ErrorCodes::BadValue, "foo"};
@@ -410,6 +472,104 @@ TEST_F(DBsClonerTest, FailsOnListDatabases) {
ASSERT_EQ(result, expectedResult);
}
+TEST_F(DBsClonerTest, DatabasesClonerReturnsCallbackCanceledIfShutdownDuringListDatabasesCommand) {
+ Status result{Status::OK()};
+ DatabasesCloner cloner{&getStorage(),
+ &getExecutor(),
+ &getDbWorkThreadPool(),
+ HostAndPort{"local:1234"},
+ [](const BSONObj&) { return true; },
+ [&result](const Status& status) {
+ log() << "setting result to " << status;
+ result = status;
+ }};
+
+ ASSERT_OK(cloner.startup());
+ ASSERT_TRUE(cloner.isActive());
+
+ cloner.shutdown();
+ executor::NetworkInterfaceMock::InNetworkGuard(getNet())->runReadyNetworkOperations();
+
+ cloner.join();
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, result);
+}
+
+bool sharedCallbackStateDestroyed = false;
+class SharedCallbackState {
+ MONGO_DISALLOW_COPYING(SharedCallbackState);
+
+public:
+ SharedCallbackState() {}
+ ~SharedCallbackState() {
+ sharedCallbackStateDestroyed = true;
+ }
+};
+
+TEST_F(DBsClonerTest, DatabasesClonerResetsOnFinishCallbackFunctionAfterCompletionDueToFailure) {
+ sharedCallbackStateDestroyed = false;
+ auto sharedCallbackData = std::make_shared<SharedCallbackState>();
+
+ Status result = getDetectableErrorStatus();
+ DatabasesCloner cloner{&getStorage(),
+ &getExecutor(),
+ &getDbWorkThreadPool(),
+ HostAndPort{"local:1234"},
+ [](const BSONObj&) { return true; },
+ [&result, sharedCallbackData](const Status& status) {
+ log() << "setting result to " << status;
+ result = status;
+ }};
+
+ ASSERT_OK(cloner.startup());
+ ASSERT_TRUE(cloner.isActive());
+
+ sharedCallbackData.reset();
+ ASSERT_FALSE(sharedCallbackStateDestroyed);
+
+ auto net = getNet();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+ processNetworkResponse("listDatabases",
+ Status(ErrorCodes::OperationFailed, "listDatabases failed"));
+ }
+
+ cloner.join();
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, result);
+ ASSERT_TRUE(sharedCallbackStateDestroyed);
+}
+
+TEST_F(DBsClonerTest, DatabasesClonerResetsOnFinishCallbackFunctionAfterCompletionDueToSuccess) {
+ sharedCallbackStateDestroyed = false;
+ auto sharedCallbackData = std::make_shared<SharedCallbackState>();
+
+ Status result = getDetectableErrorStatus();
+ DatabasesCloner cloner{&getStorage(),
+ &getExecutor(),
+ &getDbWorkThreadPool(),
+ HostAndPort{"local:1234"},
+ [](const BSONObj&) { return true; },
+ [&result, sharedCallbackData](const Status& status) {
+ log() << "setting result to " << status;
+ result = status;
+ }};
+
+ ASSERT_OK(cloner.startup());
+ ASSERT_TRUE(cloner.isActive());
+
+ sharedCallbackData.reset();
+ ASSERT_FALSE(sharedCallbackStateDestroyed);
+
+ auto net = getNet();
+ {
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+ processNetworkResponse("listDatabases", fromjson("{ok:1, databases:[]}")); // listDatabases
+ }
+
+ cloner.join();
+ ASSERT_OK(result);
+ ASSERT_TRUE(sharedCallbackStateDestroyed);
+}
+
TEST_F(DBsClonerTest, FailsOnListCollectionsOnOnlyDatabase) {
Status result{Status::OK()};
DatabasesCloner cloner{&getStorage(),