diff options
author | Benety Goh <benety@mongodb.com> | 2016-11-30 00:49:31 -0500 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2016-11-30 15:41:50 -0500 |
commit | b1a5aefc4467c5cff6e7bcf3f4aef3cb15f3ef05 (patch) | |
tree | 866b24c9fceabf1c8d6d658784e9fc83162205cf | |
parent | 6c3d10524f2cdf73cc15f3b0dacab8deca848271 (diff) | |
download | mongo-b1a5aefc4467c5cff6e7bcf3f4aef3cb15f3ef05.tar.gz |
SERVER-27052 MultiApplier clears MultiApplier::_onCompletion on completion to release any resources that might be held by function object
-rw-r--r-- | src/mongo/db/repl/multiapplier.cpp | 93 | ||||
-rw-r--r-- | src/mongo/db/repl/multiapplier.h | 29 | ||||
-rw-r--r-- | src/mongo/db/repl/multiapplier_test.cpp | 71 |
3 files changed, 167 insertions, 26 deletions
diff --git a/src/mongo/db/repl/multiapplier.cpp b/src/mongo/db/repl/multiapplier.cpp index 02ab0950d62..036ab4f0d4d 100644 --- a/src/mongo/db/repl/multiapplier.cpp +++ b/src/mongo/db/repl/multiapplier.cpp @@ -30,6 +30,8 @@ #include "mongo/db/repl/multiapplier.h" +#include <utility> + #include "mongo/db/client.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/optime.h" @@ -47,8 +49,7 @@ MultiApplier::MultiApplier(executor::TaskExecutor* executor, _operations(operations), _applyOperation(applyOperation), _multiApply(multiApply), - _onCompletion(onCompletion), - _active(false) { + _onCompletion(onCompletion) { uassert(ErrorCodes::BadValue, "null replication executor", executor); uassert(ErrorCodes::BadValue, "empty list of operations", !operations.empty()); uassert(ErrorCodes::FailedToParse, @@ -74,7 +75,7 @@ std::string MultiApplier::getDiagnosticString() const { stdx::lock_guard<stdx::mutex> lk(_mutex); str::stream output; output << "MultiApplier"; - output << " active: " << _active; + output << " active: " << _isActive_inlock(); output << ", ops: " << _operations.front().ts.timestamp().toString(); output << " - " << _operations.back().ts.timestamp().toString(); output << ", executor: " << _executor->getDiagnosticString(); @@ -83,51 +84,69 @@ std::string MultiApplier::getDiagnosticString() const { bool MultiApplier::isActive() const { stdx::lock_guard<stdx::mutex> lk(_mutex); - return _active; + return _isActive_inlock(); +} + +bool MultiApplier::_isActive_inlock() const { + return State::kRunning == _state || State::kShuttingDown == _state; } -Status MultiApplier::startup() { +Status MultiApplier::startup() noexcept { stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_active) { - return Status(ErrorCodes::IllegalOperation, "applier already started"); + switch (_state) { + case State::kPreStart: + _state = State::kRunning; + break; + case State::kRunning: + return Status(ErrorCodes::InternalError, "multi applier already started"); + case State::kShuttingDown: + return Status(ErrorCodes::ShutdownInProgress, "multi applier shutting down"); + case State::kComplete: + return Status(ErrorCodes::ShutdownInProgress, "multi applier completed"); } auto scheduleResult = _executor->scheduleWork(stdx::bind(&MultiApplier::_callback, this, stdx::placeholders::_1)); if (!scheduleResult.isOK()) { + _state = State::kComplete; return scheduleResult.getStatus(); } - _active = true; _dbWorkCallbackHandle = scheduleResult.getValue(); return Status::OK(); } void MultiApplier::shutdown() { - executor::TaskExecutor::CallbackHandle dbWorkCallbackHandle; - { - stdx::lock_guard<stdx::mutex> lk(_mutex); - - if (!_active) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + switch (_state) { + case State::kPreStart: + // Transition directly from PreStart to Complete if not started yet. + _state = State::kComplete; + return; + case State::kRunning: + _state = State::kShuttingDown; + break; + case State::kShuttingDown: + case State::kComplete: + // Nothing to do if we are already in ShuttingDown or Complete state. return; - } - - dbWorkCallbackHandle = _dbWorkCallbackHandle; } - if (dbWorkCallbackHandle.isValid()) { - _executor->cancel(dbWorkCallbackHandle); + if (_dbWorkCallbackHandle.isValid()) { + _executor->cancel(_dbWorkCallbackHandle); } } void MultiApplier::join() { stdx::unique_lock<stdx::mutex> lk(_mutex); + _condition.wait(lk, [this]() { return !_isActive_inlock(); }); +} - while (_active) { - _condition.wait(lk); - } +MultiApplier::State MultiApplier::getState_forTest() const { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _state; } void MultiApplier::_callback(const executor::TaskExecutor::CallbackArgs& cbd) { @@ -149,12 +168,40 @@ void MultiApplier::_callback(const executor::TaskExecutor::CallbackArgs& cbd) { } void MultiApplier::_finishCallback(const Status& result) { - _onCompletion(result); + // After running callback function, clear '_onCompletion' to release any resources that might be + // held by this function object. + // '_onCompletion' must be moved to a temporary copy and destroyed outside the lock in case + // there is any logic that's invoked at the function object's destruction that might call into + // this MultiApplier. 'onCompletion' must be declared before lock guard 'lock' so that it is + // destroyed outside the lock. + decltype(_onCompletion) onCompletion; + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + invariant(_onCompletion); + std::swap(_onCompletion, onCompletion); + } + + onCompletion(result); stdx::lock_guard<stdx::mutex> lk(_mutex); - _active = false; + invariant(State::kComplete != _state); + _state = State::kComplete; _condition.notify_all(); } +std::ostream& operator<<(std::ostream& os, const MultiApplier::State& state) { + switch (state) { + case MultiApplier::State::kPreStart: + return os << "PreStart"; + case MultiApplier::State::kRunning: + return os << "Running"; + case MultiApplier::State::kShuttingDown: + return os << "ShuttingDown"; + case MultiApplier::State::kComplete: + return os << "Complete"; + } + MONGO_UNREACHABLE; +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/multiapplier.h b/src/mongo/db/repl/multiapplier.h index 3b994a00abd..37f094b0324 100644 --- a/src/mongo/db/repl/multiapplier.h +++ b/src/mongo/db/repl/multiapplier.h @@ -28,6 +28,7 @@ #pragma once +#include <iosfwd> #include <memory> #include <string> #include <utility> @@ -117,7 +118,7 @@ public: /** * Starts applier by scheduling initial db work to be run by the executor. */ - Status startup(); + Status startup() noexcept; /** * Cancels current db work request. @@ -133,7 +134,23 @@ public: */ void join(); + // State transitions: + // PreStart --> Running --> ShuttingDown --> Complete + // It is possible to skip intermediate states. For example, + // Calling shutdown() when the cloner has not started will transition from PreStart directly + // to Complete. + // This enum class is made public for testing. + enum class State { kPreStart, kRunning, kShuttingDown, kComplete }; + + /** + * Returns current MultiApplier state. + * For testing only. + */ + State getState_forTest() const; + private: + bool _isActive_inlock() const; + /** * DB worker callback function - applies all operations. */ @@ -153,11 +170,17 @@ private: stdx::condition_variable _condition; - // _active is true when MultiApplier is scheduled to be run by the executor. - bool _active; + // Current multi applier state. See comments for State enum class for details. + State _state = State::kPreStart; executor::TaskExecutor::CallbackHandle _dbWorkCallbackHandle; }; +/** + * Insertion operator for MultiApplier::State. Formats fetcher state for output stream. + * For testing only. + */ +std::ostream& operator<<(std::ostream& os, const MultiApplier::State& state); + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/multiapplier_test.cpp b/src/mongo/db/repl/multiapplier_test.cpp index cd445be0c6d..9df868384c2 100644 --- a/src/mongo/db/repl/multiapplier_test.cpp +++ b/src/mongo/db/repl/multiapplier_test.cpp @@ -137,6 +137,21 @@ TEST_F(MultiApplierTest, InvalidConstruction) { "callback function cannot be null"); } +TEST_F(MultiApplierTest, MultiApplierTransitionsDirectlyToCompleteIfShutdownBeforeStarting) { + const MultiApplier::Operations operations{OplogEntry(BSON("ts" << Timestamp(Seconds(123), 0)))}; + + auto multiApply = [](OperationContext*, + MultiApplier::Operations, + MultiApplier::ApplyOperationFn) -> StatusWith<OpTime> { return OpTime(); }; + auto callback = [](const Status&) {}; + + MultiApplier multiApplier(&getExecutor(), operations, applyOperation, multiApply, callback); + ASSERT_EQUALS(MultiApplier::State::kPreStart, multiApplier.getState_forTest()); + + multiApplier.shutdown(); + ASSERT_EQUALS(MultiApplier::State::kComplete, multiApplier.getState_forTest()); +} + TEST_F(MultiApplierTest, MultiApplierInvokesCallbackWithCallbackCanceledStatusUponCancellation) { const MultiApplier::Operations operations{OplogEntry(BSON("ts" << Timestamp(Seconds(123), 0)))}; @@ -152,17 +167,22 @@ TEST_F(MultiApplierTest, MultiApplierInvokesCallbackWithCallbackCanceledStatusUp auto callback = [&](const Status& result) { callbackResult = result; }; MultiApplier multiApplier(&getExecutor(), operations, applyOperation, multiApply, callback); + ASSERT_EQUALS(MultiApplier::State::kPreStart, multiApplier.getState_forTest()); { auto net = getNet(); executor::NetworkInterfaceMock::InNetworkGuard guard(net); // Executor cannot run multiApply callback while we are on the network thread. ASSERT_OK(multiApplier.startup()); + ASSERT_EQUALS(MultiApplier::State::kRunning, multiApplier.getState_forTest()); + multiApplier.shutdown(); + ASSERT_EQUALS(MultiApplier::State::kShuttingDown, multiApplier.getState_forTest()); net->runReadyNetworkOperations(); } multiApplier.join(); + ASSERT_EQUALS(MultiApplier::State::kComplete, multiApplier.getState_forTest()); ASSERT_FALSE(multiApplyInvoked); @@ -267,4 +287,55 @@ TEST_F( ASSERT_FALSE(callbackTxn); } +class SharedCallbackState { + MONGO_DISALLOW_COPYING(SharedCallbackState); + +public: + explicit SharedCallbackState(bool* sharedCallbackStateDestroyed) + : _sharedCallbackStateDestroyed(sharedCallbackStateDestroyed) {} + ~SharedCallbackState() { + *_sharedCallbackStateDestroyed = true; + } + +private: + bool* _sharedCallbackStateDestroyed; +}; + +TEST_F(MultiApplierTest, MultiApplierResetsOnCompletionCallbackFunctionPointerUponCompletion) { + bool sharedCallbackStateDestroyed = false; + auto sharedCallbackData = std::make_shared<SharedCallbackState>(&sharedCallbackStateDestroyed); + + const MultiApplier::Operations operations{OplogEntry(BSON("ts" << Timestamp(Seconds(123), 0)))}; + + auto multiApply = [&](OperationContext*, + MultiApplier::Operations operations, + MultiApplier::ApplyOperationFn) -> StatusWith<OpTime> { + return operations.back().getOpTime(); + }; + + auto callbackResult = getDetectableErrorStatus(); + + MultiApplier multiApplier( + &getExecutor(), + operations, + applyOperation, + multiApply, + [&callbackResult, sharedCallbackData](const Status& result) { callbackResult = result; }); + + sharedCallbackData.reset(); + ASSERT_FALSE(sharedCallbackStateDestroyed); + + ASSERT_OK(multiApplier.startup()); + { + auto net = getNet(); + executor::NetworkInterfaceMock::InNetworkGuard guard(net); + net->runReadyNetworkOperations(); + } + multiApplier.join(); + + ASSERT_OK(callbackResult); + // Shared state should be destroyed when applier is finished. + ASSERT_TRUE(sharedCallbackStateDestroyed); +} + } // namespace |