summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/multiapplier.cpp
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2016-11-30 00:49:31 -0500
committerBenety Goh <benety@mongodb.com>2016-11-30 15:41:50 -0500
commitb1a5aefc4467c5cff6e7bcf3f4aef3cb15f3ef05 (patch)
tree866b24c9fceabf1c8d6d658784e9fc83162205cf /src/mongo/db/repl/multiapplier.cpp
parent6c3d10524f2cdf73cc15f3b0dacab8deca848271 (diff)
downloadmongo-b1a5aefc4467c5cff6e7bcf3f4aef3cb15f3ef05.tar.gz
SERVER-27052 MultiApplier clears MultiApplier::_onCompletion on completion to release any resources that might be held by function object
Diffstat (limited to 'src/mongo/db/repl/multiapplier.cpp')
-rw-r--r--src/mongo/db/repl/multiapplier.cpp93
1 files changed, 70 insertions, 23 deletions
diff --git a/src/mongo/db/repl/multiapplier.cpp b/src/mongo/db/repl/multiapplier.cpp
index 02ab0950d62..036ab4f0d4d 100644
--- a/src/mongo/db/repl/multiapplier.cpp
+++ b/src/mongo/db/repl/multiapplier.cpp
@@ -30,6 +30,8 @@
#include "mongo/db/repl/multiapplier.h"
+#include <utility>
+
#include "mongo/db/client.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/optime.h"
@@ -47,8 +49,7 @@ MultiApplier::MultiApplier(executor::TaskExecutor* executor,
_operations(operations),
_applyOperation(applyOperation),
_multiApply(multiApply),
- _onCompletion(onCompletion),
- _active(false) {
+ _onCompletion(onCompletion) {
uassert(ErrorCodes::BadValue, "null replication executor", executor);
uassert(ErrorCodes::BadValue, "empty list of operations", !operations.empty());
uassert(ErrorCodes::FailedToParse,
@@ -74,7 +75,7 @@ std::string MultiApplier::getDiagnosticString() const {
stdx::lock_guard<stdx::mutex> lk(_mutex);
str::stream output;
output << "MultiApplier";
- output << " active: " << _active;
+ output << " active: " << _isActive_inlock();
output << ", ops: " << _operations.front().ts.timestamp().toString();
output << " - " << _operations.back().ts.timestamp().toString();
output << ", executor: " << _executor->getDiagnosticString();
@@ -83,51 +84,69 @@ std::string MultiApplier::getDiagnosticString() const {
bool MultiApplier::isActive() const {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _active;
+ return _isActive_inlock();
+}
+
+bool MultiApplier::_isActive_inlock() const {
+ return State::kRunning == _state || State::kShuttingDown == _state;
}
-Status MultiApplier::startup() {
+Status MultiApplier::startup() noexcept {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_active) {
- return Status(ErrorCodes::IllegalOperation, "applier already started");
+ switch (_state) {
+ case State::kPreStart:
+ _state = State::kRunning;
+ break;
+ case State::kRunning:
+ return Status(ErrorCodes::InternalError, "multi applier already started");
+ case State::kShuttingDown:
+ return Status(ErrorCodes::ShutdownInProgress, "multi applier shutting down");
+ case State::kComplete:
+ return Status(ErrorCodes::ShutdownInProgress, "multi applier completed");
}
auto scheduleResult =
_executor->scheduleWork(stdx::bind(&MultiApplier::_callback, this, stdx::placeholders::_1));
if (!scheduleResult.isOK()) {
+ _state = State::kComplete;
return scheduleResult.getStatus();
}
- _active = true;
_dbWorkCallbackHandle = scheduleResult.getValue();
return Status::OK();
}
void MultiApplier::shutdown() {
- executor::TaskExecutor::CallbackHandle dbWorkCallbackHandle;
- {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
-
- if (!_active) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ switch (_state) {
+ case State::kPreStart:
+ // Transition directly from PreStart to Complete if not started yet.
+ _state = State::kComplete;
+ return;
+ case State::kRunning:
+ _state = State::kShuttingDown;
+ break;
+ case State::kShuttingDown:
+ case State::kComplete:
+ // Nothing to do if we are already in ShuttingDown or Complete state.
return;
- }
-
- dbWorkCallbackHandle = _dbWorkCallbackHandle;
}
- if (dbWorkCallbackHandle.isValid()) {
- _executor->cancel(dbWorkCallbackHandle);
+ if (_dbWorkCallbackHandle.isValid()) {
+ _executor->cancel(_dbWorkCallbackHandle);
}
}
void MultiApplier::join() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _condition.wait(lk, [this]() { return !_isActive_inlock(); });
+}
- while (_active) {
- _condition.wait(lk);
- }
+MultiApplier::State MultiApplier::getState_forTest() const {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _state;
}
void MultiApplier::_callback(const executor::TaskExecutor::CallbackArgs& cbd) {
@@ -149,12 +168,40 @@ void MultiApplier::_callback(const executor::TaskExecutor::CallbackArgs& cbd) {
}
void MultiApplier::_finishCallback(const Status& result) {
- _onCompletion(result);
+ // After running callback function, clear '_onCompletion' to release any resources that might be
+ // held by this function object.
+ // '_onCompletion' must be moved to a temporary copy and destroyed outside the lock in case
+ // there is any logic that's invoked at the function object's destruction that might call into
+ // this MultiApplier. 'onCompletion' must be declared before lock guard 'lock' so that it is
+ // destroyed outside the lock.
+ decltype(_onCompletion) onCompletion;
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ invariant(_onCompletion);
+ std::swap(_onCompletion, onCompletion);
+ }
+
+ onCompletion(result);
stdx::lock_guard<stdx::mutex> lk(_mutex);
- _active = false;
+ invariant(State::kComplete != _state);
+ _state = State::kComplete;
_condition.notify_all();
}
+std::ostream& operator<<(std::ostream& os, const MultiApplier::State& state) {
+ switch (state) {
+ case MultiApplier::State::kPreStart:
+ return os << "PreStart";
+ case MultiApplier::State::kRunning:
+ return os << "Running";
+ case MultiApplier::State::kShuttingDown:
+ return os << "ShuttingDown";
+ case MultiApplier::State::kComplete:
+ return os << "Complete";
+ }
+ MONGO_UNREACHABLE;
+}
+
} // namespace repl
} // namespace mongo