summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2016-11-30 00:49:31 -0500
committerBenety Goh <benety@mongodb.com>2016-11-30 15:41:50 -0500
commitb1a5aefc4467c5cff6e7bcf3f4aef3cb15f3ef05 (patch)
tree866b24c9fceabf1c8d6d658784e9fc83162205cf /src/mongo/db
parent6c3d10524f2cdf73cc15f3b0dacab8deca848271 (diff)
downloadmongo-b1a5aefc4467c5cff6e7bcf3f4aef3cb15f3ef05.tar.gz
SERVER-27052 MultiApplier clears MultiApplier::_onCompletion on completion to release any resources that might be held by function object
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/repl/multiapplier.cpp93
-rw-r--r--src/mongo/db/repl/multiapplier.h29
-rw-r--r--src/mongo/db/repl/multiapplier_test.cpp71
3 files changed, 167 insertions, 26 deletions
diff --git a/src/mongo/db/repl/multiapplier.cpp b/src/mongo/db/repl/multiapplier.cpp
index 02ab0950d62..036ab4f0d4d 100644
--- a/src/mongo/db/repl/multiapplier.cpp
+++ b/src/mongo/db/repl/multiapplier.cpp
@@ -30,6 +30,8 @@
#include "mongo/db/repl/multiapplier.h"
+#include <utility>
+
#include "mongo/db/client.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/optime.h"
@@ -47,8 +49,7 @@ MultiApplier::MultiApplier(executor::TaskExecutor* executor,
_operations(operations),
_applyOperation(applyOperation),
_multiApply(multiApply),
- _onCompletion(onCompletion),
- _active(false) {
+ _onCompletion(onCompletion) {
uassert(ErrorCodes::BadValue, "null replication executor", executor);
uassert(ErrorCodes::BadValue, "empty list of operations", !operations.empty());
uassert(ErrorCodes::FailedToParse,
@@ -74,7 +75,7 @@ std::string MultiApplier::getDiagnosticString() const {
stdx::lock_guard<stdx::mutex> lk(_mutex);
str::stream output;
output << "MultiApplier";
- output << " active: " << _active;
+ output << " active: " << _isActive_inlock();
output << ", ops: " << _operations.front().ts.timestamp().toString();
output << " - " << _operations.back().ts.timestamp().toString();
output << ", executor: " << _executor->getDiagnosticString();
@@ -83,51 +84,69 @@ std::string MultiApplier::getDiagnosticString() const {
bool MultiApplier::isActive() const {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _active;
+ return _isActive_inlock();
+}
+
+bool MultiApplier::_isActive_inlock() const {
+ return State::kRunning == _state || State::kShuttingDown == _state;
}
-Status MultiApplier::startup() {
+Status MultiApplier::startup() noexcept {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_active) {
- return Status(ErrorCodes::IllegalOperation, "applier already started");
+ switch (_state) {
+ case State::kPreStart:
+ _state = State::kRunning;
+ break;
+ case State::kRunning:
+ return Status(ErrorCodes::InternalError, "multi applier already started");
+ case State::kShuttingDown:
+ return Status(ErrorCodes::ShutdownInProgress, "multi applier shutting down");
+ case State::kComplete:
+ return Status(ErrorCodes::ShutdownInProgress, "multi applier completed");
}
auto scheduleResult =
_executor->scheduleWork(stdx::bind(&MultiApplier::_callback, this, stdx::placeholders::_1));
if (!scheduleResult.isOK()) {
+ _state = State::kComplete;
return scheduleResult.getStatus();
}
- _active = true;
_dbWorkCallbackHandle = scheduleResult.getValue();
return Status::OK();
}
void MultiApplier::shutdown() {
- executor::TaskExecutor::CallbackHandle dbWorkCallbackHandle;
- {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
-
- if (!_active) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ switch (_state) {
+ case State::kPreStart:
+ // Transition directly from PreStart to Complete if not started yet.
+ _state = State::kComplete;
+ return;
+ case State::kRunning:
+ _state = State::kShuttingDown;
+ break;
+ case State::kShuttingDown:
+ case State::kComplete:
+ // Nothing to do if we are already in ShuttingDown or Complete state.
return;
- }
-
- dbWorkCallbackHandle = _dbWorkCallbackHandle;
}
- if (dbWorkCallbackHandle.isValid()) {
- _executor->cancel(dbWorkCallbackHandle);
+ if (_dbWorkCallbackHandle.isValid()) {
+ _executor->cancel(_dbWorkCallbackHandle);
}
}
void MultiApplier::join() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _condition.wait(lk, [this]() { return !_isActive_inlock(); });
+}
- while (_active) {
- _condition.wait(lk);
- }
+MultiApplier::State MultiApplier::getState_forTest() const {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _state;
}
void MultiApplier::_callback(const executor::TaskExecutor::CallbackArgs& cbd) {
@@ -149,12 +168,40 @@ void MultiApplier::_callback(const executor::TaskExecutor::CallbackArgs& cbd) {
}
void MultiApplier::_finishCallback(const Status& result) {
- _onCompletion(result);
+ // After running callback function, clear '_onCompletion' to release any resources that might be
+ // held by this function object.
+ // '_onCompletion' must be moved to a temporary copy and destroyed outside the lock in case
+ // there is any logic that's invoked at the function object's destruction that might call into
+ // this MultiApplier. 'onCompletion' must be declared before lock guard 'lock' so that it is
+ // destroyed outside the lock.
+ decltype(_onCompletion) onCompletion;
+ {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ invariant(_onCompletion);
+ std::swap(_onCompletion, onCompletion);
+ }
+
+ onCompletion(result);
stdx::lock_guard<stdx::mutex> lk(_mutex);
- _active = false;
+ invariant(State::kComplete != _state);
+ _state = State::kComplete;
_condition.notify_all();
}
+std::ostream& operator<<(std::ostream& os, const MultiApplier::State& state) {
+ switch (state) {
+ case MultiApplier::State::kPreStart:
+ return os << "PreStart";
+ case MultiApplier::State::kRunning:
+ return os << "Running";
+ case MultiApplier::State::kShuttingDown:
+ return os << "ShuttingDown";
+ case MultiApplier::State::kComplete:
+ return os << "Complete";
+ }
+ MONGO_UNREACHABLE;
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/multiapplier.h b/src/mongo/db/repl/multiapplier.h
index 3b994a00abd..37f094b0324 100644
--- a/src/mongo/db/repl/multiapplier.h
+++ b/src/mongo/db/repl/multiapplier.h
@@ -28,6 +28,7 @@
#pragma once
+#include <iosfwd>
#include <memory>
#include <string>
#include <utility>
@@ -117,7 +118,7 @@ public:
/**
* Starts applier by scheduling initial db work to be run by the executor.
*/
- Status startup();
+ Status startup() noexcept;
/**
* Cancels current db work request.
@@ -133,7 +134,23 @@ public:
*/
void join();
+ // State transitions:
+ // PreStart --> Running --> ShuttingDown --> Complete
+ // It is possible to skip intermediate states. For example,
+ // Calling shutdown() when the cloner has not started will transition from PreStart directly
+ // to Complete.
+ // This enum class is made public for testing.
+ enum class State { kPreStart, kRunning, kShuttingDown, kComplete };
+
+ /**
+ * Returns current MultiApplier state.
+ * For testing only.
+ */
+ State getState_forTest() const;
+
private:
+ bool _isActive_inlock() const;
+
/**
* DB worker callback function - applies all operations.
*/
@@ -153,11 +170,17 @@ private:
stdx::condition_variable _condition;
- // _active is true when MultiApplier is scheduled to be run by the executor.
- bool _active;
+ // Current multi applier state. See comments for State enum class for details.
+ State _state = State::kPreStart;
executor::TaskExecutor::CallbackHandle _dbWorkCallbackHandle;
};
+/**
+ * Insertion operator for MultiApplier::State. Formats fetcher state for output stream.
+ * For testing only.
+ */
+std::ostream& operator<<(std::ostream& os, const MultiApplier::State& state);
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/multiapplier_test.cpp b/src/mongo/db/repl/multiapplier_test.cpp
index cd445be0c6d..9df868384c2 100644
--- a/src/mongo/db/repl/multiapplier_test.cpp
+++ b/src/mongo/db/repl/multiapplier_test.cpp
@@ -137,6 +137,21 @@ TEST_F(MultiApplierTest, InvalidConstruction) {
"callback function cannot be null");
}
+TEST_F(MultiApplierTest, MultiApplierTransitionsDirectlyToCompleteIfShutdownBeforeStarting) {
+ const MultiApplier::Operations operations{OplogEntry(BSON("ts" << Timestamp(Seconds(123), 0)))};
+
+ auto multiApply = [](OperationContext*,
+ MultiApplier::Operations,
+ MultiApplier::ApplyOperationFn) -> StatusWith<OpTime> { return OpTime(); };
+ auto callback = [](const Status&) {};
+
+ MultiApplier multiApplier(&getExecutor(), operations, applyOperation, multiApply, callback);
+ ASSERT_EQUALS(MultiApplier::State::kPreStart, multiApplier.getState_forTest());
+
+ multiApplier.shutdown();
+ ASSERT_EQUALS(MultiApplier::State::kComplete, multiApplier.getState_forTest());
+}
+
TEST_F(MultiApplierTest, MultiApplierInvokesCallbackWithCallbackCanceledStatusUponCancellation) {
const MultiApplier::Operations operations{OplogEntry(BSON("ts" << Timestamp(Seconds(123), 0)))};
@@ -152,17 +167,22 @@ TEST_F(MultiApplierTest, MultiApplierInvokesCallbackWithCallbackCanceledStatusUp
auto callback = [&](const Status& result) { callbackResult = result; };
MultiApplier multiApplier(&getExecutor(), operations, applyOperation, multiApply, callback);
+ ASSERT_EQUALS(MultiApplier::State::kPreStart, multiApplier.getState_forTest());
{
auto net = getNet();
executor::NetworkInterfaceMock::InNetworkGuard guard(net);
// Executor cannot run multiApply callback while we are on the network thread.
ASSERT_OK(multiApplier.startup());
+ ASSERT_EQUALS(MultiApplier::State::kRunning, multiApplier.getState_forTest());
+
multiApplier.shutdown();
+ ASSERT_EQUALS(MultiApplier::State::kShuttingDown, multiApplier.getState_forTest());
net->runReadyNetworkOperations();
}
multiApplier.join();
+ ASSERT_EQUALS(MultiApplier::State::kComplete, multiApplier.getState_forTest());
ASSERT_FALSE(multiApplyInvoked);
@@ -267,4 +287,55 @@ TEST_F(
ASSERT_FALSE(callbackTxn);
}
+class SharedCallbackState {
+ MONGO_DISALLOW_COPYING(SharedCallbackState);
+
+public:
+ explicit SharedCallbackState(bool* sharedCallbackStateDestroyed)
+ : _sharedCallbackStateDestroyed(sharedCallbackStateDestroyed) {}
+ ~SharedCallbackState() {
+ *_sharedCallbackStateDestroyed = true;
+ }
+
+private:
+ bool* _sharedCallbackStateDestroyed;
+};
+
+TEST_F(MultiApplierTest, MultiApplierResetsOnCompletionCallbackFunctionPointerUponCompletion) {
+ bool sharedCallbackStateDestroyed = false;
+ auto sharedCallbackData = std::make_shared<SharedCallbackState>(&sharedCallbackStateDestroyed);
+
+ const MultiApplier::Operations operations{OplogEntry(BSON("ts" << Timestamp(Seconds(123), 0)))};
+
+ auto multiApply = [&](OperationContext*,
+ MultiApplier::Operations operations,
+ MultiApplier::ApplyOperationFn) -> StatusWith<OpTime> {
+ return operations.back().getOpTime();
+ };
+
+ auto callbackResult = getDetectableErrorStatus();
+
+ MultiApplier multiApplier(
+ &getExecutor(),
+ operations,
+ applyOperation,
+ multiApply,
+ [&callbackResult, sharedCallbackData](const Status& result) { callbackResult = result; });
+
+ sharedCallbackData.reset();
+ ASSERT_FALSE(sharedCallbackStateDestroyed);
+
+ ASSERT_OK(multiApplier.startup());
+ {
+ auto net = getNet();
+ executor::NetworkInterfaceMock::InNetworkGuard guard(net);
+ net->runReadyNetworkOperations();
+ }
+ multiApplier.join();
+
+ ASSERT_OK(callbackResult);
+ // Shared state should be destroyed when applier is finished.
+ ASSERT_TRUE(sharedCallbackStateDestroyed);
+}
+
} // namespace