diff options
author | Benety Goh <benety@mongodb.com> | 2016-11-30 00:49:31 -0500 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2016-11-30 15:41:50 -0500 |
commit | b1a5aefc4467c5cff6e7bcf3f4aef3cb15f3ef05 (patch) | |
tree | 866b24c9fceabf1c8d6d658784e9fc83162205cf /src/mongo/db/repl/multiapplier.cpp | |
parent | 6c3d10524f2cdf73cc15f3b0dacab8deca848271 (diff) | |
download | mongo-b1a5aefc4467c5cff6e7bcf3f4aef3cb15f3ef05.tar.gz |
SERVER-27052 MultiApplier clears MultiApplier::_onCompletion on completion to release any resources that might be held by function object
Diffstat (limited to 'src/mongo/db/repl/multiapplier.cpp')
-rw-r--r-- | src/mongo/db/repl/multiapplier.cpp | 93 |
1 files changed, 70 insertions, 23 deletions
diff --git a/src/mongo/db/repl/multiapplier.cpp b/src/mongo/db/repl/multiapplier.cpp index 02ab0950d62..036ab4f0d4d 100644 --- a/src/mongo/db/repl/multiapplier.cpp +++ b/src/mongo/db/repl/multiapplier.cpp @@ -30,6 +30,8 @@ #include "mongo/db/repl/multiapplier.h" +#include <utility> + #include "mongo/db/client.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/optime.h" @@ -47,8 +49,7 @@ MultiApplier::MultiApplier(executor::TaskExecutor* executor, _operations(operations), _applyOperation(applyOperation), _multiApply(multiApply), - _onCompletion(onCompletion), - _active(false) { + _onCompletion(onCompletion) { uassert(ErrorCodes::BadValue, "null replication executor", executor); uassert(ErrorCodes::BadValue, "empty list of operations", !operations.empty()); uassert(ErrorCodes::FailedToParse, @@ -74,7 +75,7 @@ std::string MultiApplier::getDiagnosticString() const { stdx::lock_guard<stdx::mutex> lk(_mutex); str::stream output; output << "MultiApplier"; - output << " active: " << _active; + output << " active: " << _isActive_inlock(); output << ", ops: " << _operations.front().ts.timestamp().toString(); output << " - " << _operations.back().ts.timestamp().toString(); output << ", executor: " << _executor->getDiagnosticString(); @@ -83,51 +84,69 @@ std::string MultiApplier::getDiagnosticString() const { bool MultiApplier::isActive() const { stdx::lock_guard<stdx::mutex> lk(_mutex); - return _active; + return _isActive_inlock(); +} + +bool MultiApplier::_isActive_inlock() const { + return State::kRunning == _state || State::kShuttingDown == _state; } -Status MultiApplier::startup() { +Status MultiApplier::startup() noexcept { stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_active) { - return Status(ErrorCodes::IllegalOperation, "applier already started"); + switch (_state) { + case State::kPreStart: + _state = State::kRunning; + break; + case State::kRunning: + return Status(ErrorCodes::InternalError, "multi applier already started"); + case State::kShuttingDown: + return Status(ErrorCodes::ShutdownInProgress, "multi applier shutting down"); + case State::kComplete: + return Status(ErrorCodes::ShutdownInProgress, "multi applier completed"); } auto scheduleResult = _executor->scheduleWork(stdx::bind(&MultiApplier::_callback, this, stdx::placeholders::_1)); if (!scheduleResult.isOK()) { + _state = State::kComplete; return scheduleResult.getStatus(); } - _active = true; _dbWorkCallbackHandle = scheduleResult.getValue(); return Status::OK(); } void MultiApplier::shutdown() { - executor::TaskExecutor::CallbackHandle dbWorkCallbackHandle; - { - stdx::lock_guard<stdx::mutex> lk(_mutex); - - if (!_active) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + switch (_state) { + case State::kPreStart: + // Transition directly from PreStart to Complete if not started yet. + _state = State::kComplete; + return; + case State::kRunning: + _state = State::kShuttingDown; + break; + case State::kShuttingDown: + case State::kComplete: + // Nothing to do if we are already in ShuttingDown or Complete state. return; - } - - dbWorkCallbackHandle = _dbWorkCallbackHandle; } - if (dbWorkCallbackHandle.isValid()) { - _executor->cancel(dbWorkCallbackHandle); + if (_dbWorkCallbackHandle.isValid()) { + _executor->cancel(_dbWorkCallbackHandle); } } void MultiApplier::join() { stdx::unique_lock<stdx::mutex> lk(_mutex); + _condition.wait(lk, [this]() { return !_isActive_inlock(); }); +} - while (_active) { - _condition.wait(lk); - } +MultiApplier::State MultiApplier::getState_forTest() const { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _state; } void MultiApplier::_callback(const executor::TaskExecutor::CallbackArgs& cbd) { @@ -149,12 +168,40 @@ void MultiApplier::_callback(const executor::TaskExecutor::CallbackArgs& cbd) { } void MultiApplier::_finishCallback(const Status& result) { - _onCompletion(result); + // After running callback function, clear '_onCompletion' to release any resources that might be + // held by this function object. + // '_onCompletion' must be moved to a temporary copy and destroyed outside the lock in case + // there is any logic that's invoked at the function object's destruction that might call into + // this MultiApplier. 'onCompletion' must be declared before lock guard 'lock' so that it is + // destroyed outside the lock. + decltype(_onCompletion) onCompletion; + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + invariant(_onCompletion); + std::swap(_onCompletion, onCompletion); + } + + onCompletion(result); stdx::lock_guard<stdx::mutex> lk(_mutex); - _active = false; + invariant(State::kComplete != _state); + _state = State::kComplete; _condition.notify_all(); } +std::ostream& operator<<(std::ostream& os, const MultiApplier::State& state) { + switch (state) { + case MultiApplier::State::kPreStart: + return os << "PreStart"; + case MultiApplier::State::kRunning: + return os << "Running"; + case MultiApplier::State::kShuttingDown: + return os << "ShuttingDown"; + case MultiApplier::State::kComplete: + return os << "Complete"; + } + MONGO_UNREACHABLE; +} + } // namespace repl } // namespace mongo |