summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAndy Schwerin <schwerin@mongodb.com>2016-10-13 10:18:19 -0400
committerAndy Schwerin <schwerin@mongodb.com>2016-10-13 10:18:19 -0400
commit9228a2b401b4af0adfa53a61053fba3a7df4f75c (patch)
treeb9a66d8073dfa532c87dd04f2e48d90d75e4097e /src
parent8794dd4bdcab131a4ecf0963505e7fb2a34bf5e6 (diff)
downloadmongo-9228a2b401b4af0adfa53a61053fba3a7df4f75c.tar.gz
SERVER-26305 Use interruptible condition variables in ReplicationCoordinatorImpl instead of KillOpListener
While this change also improves the readability of _awaitReplication_inlock and stepDown, it resovles SERVER-26305 by breaking a deadlock cycle caused by the fact that KillOpListener methods get run under a mutex in ServiceContext.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/db.cpp1
-rw-r--r--src/mongo/db/operation_context.cpp16
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp334
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h70
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp7
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_test.cpp119
-rw-r--r--src/mongo/db/repl/replication_coordinator_test_fixture.cpp4
-rw-r--r--src/mongo/db/write_concern_options.h3
-rw-r--r--src/mongo/executor/network_interface_mock.cpp5
-rw-r--r--src/mongo/executor/network_interface_mock.h19
10 files changed, 221 insertions, 357 deletions
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index 1b67ab16783..a50cdb92faf 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -985,7 +985,6 @@ MONGO_INITIALIZER_WITH_PREREQUISITES(CreateReplicationManager,
new repl::TopologyCoordinatorImpl(topoCoordOptions),
storageInterface,
static_cast<int64_t>(curTimeMillis64()));
- serviceContext->registerKillOpListener(replCoord.get());
repl::ReplicationCoordinator::set(serviceContext, std::move(replCoord));
repl::setOplogCollectionName();
return Status::OK();
diff --git a/src/mongo/db/operation_context.cpp b/src/mongo/db/operation_context.cpp
index 2a5c8814bf3..8151fe97dfa 100644
--- a/src/mongo/db/operation_context.cpp
+++ b/src/mongo/db/operation_context.cpp
@@ -248,16 +248,28 @@ static NOINLINE_DECL stdx::cv_status cvWaitUntilWithClockSource(ClockSource* clo
auto alarmInfo = std::make_shared<AlarmInfo>();
alarmInfo->waitCV = &cv;
alarmInfo->waitMutex = m.mutex();
- invariantOK(clockSource->setAlarm(deadline, [alarmInfo] {
+ const auto waiterThreadId = stdx::this_thread::get_id();
+ bool invokedAlarmInline = false;
+ invariantOK(clockSource->setAlarm(deadline, [alarmInfo, waiterThreadId, &invokedAlarmInline] {
stdx::lock_guard<stdx::mutex> controlLk(alarmInfo->controlMutex);
alarmInfo->cvWaitResult = stdx::cv_status::timeout;
if (!alarmInfo->waitMutex) {
return;
}
+ if (stdx::this_thread::get_id() == waiterThreadId) {
+ // In NetworkInterfaceMock, setAlarm may invoke its callback immediately if the deadline
+ // has expired, so we detect that case and avoid self-deadlock by returning early, here.
+ // It is safe to set invokedAlarmInline without synchronization in this case, because it
+ // is exactly the case where the same thread is writing and consulting the value.
+ invokedAlarmInline = true;
+ return;
+ }
stdx::lock_guard<stdx::mutex> waitLk(*alarmInfo->waitMutex);
alarmInfo->waitCV->notify_all();
}));
- cv.wait(m);
+ if (!invokedAlarmInline) {
+ cv.wait(m);
+ }
m.unlock();
stdx::lock_guard<stdx::mutex> controlLk(alarmInfo->controlMutex);
m.lock();
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 028d02c55b4..dd8a57cd60f 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -723,9 +723,15 @@ void ReplicationCoordinatorImpl::shutdown(OperationContext* txn) {
fassert(18823, _rsConfigState != kConfigStartingUp);
_replicationWaiterList.signalAndRemoveAll_inlock();
_opTimeWaiterList.signalAndRemoveAll_inlock();
+ _currentCommittedSnapshotCond.notify_all();
_dr.swap(drCopy);
}
+ {
+ stdx::lock_guard<stdx::mutex> topoLock(_topoMutex);
+ _signalStepDownWaiter_inlock();
+ }
+
// joining the replication executor is blocking so it must be run outside of the mutex
if (drCopy) {
drCopy->shutdown(txn);
@@ -1265,7 +1271,6 @@ Status ReplicationCoordinatorImpl::waitUntilOpTimeForRead(OperationContext* txn,
if (!waitStatus.isOK()) {
return waitStatus;
}
-
LOG(3) << "Got notified of new snapshot: " << _currentCommittedSnapshot->toString();
continue;
}
@@ -1464,27 +1469,6 @@ Status ReplicationCoordinatorImpl::_setLastOptime_inlock(const UpdatePositionArg
return Status::OK();
}
-void ReplicationCoordinatorImpl::interrupt(unsigned opId) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- // Wake ops waiting for a new committed snapshot.
- _currentCommittedSnapshotCond.notify_all();
-
- auto hasSameOpID = [opId](WaiterInfo* waiter) { return waiter->opID == opId; };
- _replicationWaiterList.signalAndRemoveIf_inlock(hasSameOpID);
- _opTimeWaiterList.signalAndRemoveIf_inlock(hasSameOpID);
- _signalStepDownWaiter_inlock();
-}
-
-void ReplicationCoordinatorImpl::interruptAll() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- // Wake ops waiting for a new committed snapshot.
- _currentCommittedSnapshotCond.notify_all();
-
- _replicationWaiterList.signalAndRemoveAll_inlock();
- _opTimeWaiterList.signalAndRemoveAll_inlock();
- _signalStepDownWaiter_inlock();
-}
-
bool ReplicationCoordinatorImpl::_doneWaitingForReplication_inlock(
const OpTime& opTime, SnapshotName minSnapshot, const WriteConcernOptions& writeConcern) {
// The syncMode cannot be unset.
@@ -1584,8 +1568,9 @@ ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::awaitRepli
Timer timer;
WriteConcernOptions fixedWriteConcern = populateUnsetWriteConcernOptionsSyncMode(writeConcern);
stdx::unique_lock<stdx::mutex> lock(_mutex);
- return _awaitReplication_inlock(
- &timer, &lock, txn, opTime, SnapshotName::min(), fixedWriteConcern);
+ auto status =
+ _awaitReplication_inlock(&lock, txn, opTime, SnapshotName::min(), fixedWriteConcern);
+ return {std::move(status), duration_cast<Milliseconds>(timer.elapsed())};
}
ReplicationCoordinator::StatusAndDuration
@@ -1595,81 +1580,86 @@ ReplicationCoordinatorImpl::awaitReplicationOfLastOpForClient(
WriteConcernOptions fixedWriteConcern = populateUnsetWriteConcernOptionsSyncMode(writeConcern);
stdx::unique_lock<stdx::mutex> lock(_mutex);
const auto& clientInfo = ReplClientInfo::forClient(txn->getClient());
- return _awaitReplication_inlock(&timer,
- &lock,
- txn,
- clientInfo.getLastOp(),
- clientInfo.getLastSnapshot(),
- fixedWriteConcern);
+ auto status = _awaitReplication_inlock(
+ &lock, txn, clientInfo.getLastOp(), clientInfo.getLastSnapshot(), fixedWriteConcern);
+ return {std::move(status), duration_cast<Milliseconds>(timer.elapsed())};
}
-ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::_awaitReplication_inlock(
- const Timer* timer,
+Status ReplicationCoordinatorImpl::_awaitReplication_inlock(
stdx::unique_lock<stdx::mutex>* lock,
OperationContext* txn,
const OpTime& opTime,
SnapshotName minSnapshot,
const WriteConcernOptions& writeConcern) {
+
// We should never wait for replication if we are holding any locks, because this can
// potentially block for long time while doing network activity.
if (txn->lockState()->isLocked()) {
- return StatusAndDuration({ErrorCodes::IllegalOperation,
- "Waiting for replication not allowed while holding a lock"},
- Milliseconds(timer->millis()));
+ return {ErrorCodes::IllegalOperation,
+ "Waiting for replication not allowed while holding a lock"};
}
const Mode replMode = getReplicationMode();
if (replMode == modeNone) {
// no replication check needed (validated above)
- return StatusAndDuration(Status::OK(), Milliseconds(timer->millis()));
+ return Status::OK();
}
if (replMode == modeMasterSlave && writeConcern.wMode == WriteConcernOptions::kMajority) {
// with master/slave, majority is equivalent to w=1
- return StatusAndDuration(Status::OK(), Milliseconds(timer->millis()));
+ return Status::OK();
}
if (opTime.isNull() && minSnapshot == SnapshotName::min()) {
// If waiting for the empty optime, always say it's been replicated.
- return StatusAndDuration(Status::OK(), Milliseconds(timer->millis()));
+ return Status::OK();
}
if (replMode == modeReplSet && !_memberState.primary()) {
- return StatusAndDuration(Status(ErrorCodes::PrimarySteppedDown,
- "Primary stepped down while waiting for replication"),
- Milliseconds(timer->millis()));
+ return {ErrorCodes::PrimarySteppedDown,
+ "Primary stepped down while waiting for replication"};
}
if (writeConcern.wMode.empty()) {
if (writeConcern.wNumNodes < 1) {
- return StatusAndDuration(Status::OK(), Milliseconds(timer->millis()));
+ return Status::OK();
} else if (writeConcern.wNumNodes == 1 && _getMyLastAppliedOpTime_inlock() >= opTime) {
- return StatusAndDuration(Status::OK(), Milliseconds(timer->millis()));
+ return Status::OK();
}
}
+ auto clockSource = txn->getServiceContext()->getFastClockSource();
+ const auto wTimeoutDate = [&]() -> const Date_t {
+ if (writeConcern.wDeadline != Date_t::max()) {
+ return writeConcern.wDeadline;
+ }
+ if (writeConcern.wTimeout == WriteConcernOptions::kNoTimeout) {
+ return Date_t::max();
+ }
+ return clockSource->now() + clockSource->getPrecision() +
+ Milliseconds{writeConcern.wTimeout};
+ }();
+
// Must hold _mutex before constructing waitInfo as it will modify _replicationWaiterList
stdx::condition_variable condVar;
WaiterInfoGuard waitInfo(
&_replicationWaiterList, txn->getOpID(), opTime, &writeConcern, &condVar);
while (!_doneWaitingForReplication_inlock(opTime, minSnapshot, writeConcern)) {
- const Milliseconds elapsed{timer->millis()};
-
- Status interruptedStatus = txn->checkForInterruptNoAssert();
- if (!interruptedStatus.isOK()) {
- return StatusAndDuration(interruptedStatus, elapsed);
+ if (replMode == modeReplSet && !_getMemberState_inlock().primary()) {
+ return {ErrorCodes::PrimarySteppedDown,
+ "Not primary anymore while waiting for replication - primary stepped down"};
}
- if (replMode == modeReplSet && !_getMemberState_inlock().primary()) {
- return StatusAndDuration(
- Status(ErrorCodes::PrimarySteppedDown,
- "Not primary anymore while waiting for replication - primary stepped down"),
- elapsed);
+ if (_inShutdown) {
+ return {ErrorCodes::ShutdownInProgress, "Replication is being shut down"};
}
- if (writeConcern.wTimeout != WriteConcernOptions::kNoTimeout &&
- elapsed > Milliseconds{writeConcern.wTimeout}) {
+ auto status = txn->waitForConditionOrInterruptNoAssertUntil(condVar, *lock, wTimeoutDate);
+ if (!status.isOK()) {
+ return status.getStatus();
+ }
+ if (status.getValue() == stdx::cv_status::timeout) {
if (Command::testCommandsEnabled) {
// log state of replica set on timeout to help with diagnosis.
BSONObjBuilder progress;
@@ -1678,45 +1668,17 @@ ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::_awaitRepl
<< ", waitInfo:" << waitInfo.waiter.toBSON()
<< ", progress: " << progress.done();
}
- return StatusAndDuration(
- Status(ErrorCodes::WriteConcernFailed, "waiting for replication timed out"),
- elapsed);
+ return {ErrorCodes::WriteConcernFailed, "waiting for replication timed out"};
}
-
- if (_inShutdown) {
- return StatusAndDuration(
- Status(ErrorCodes::ShutdownInProgress, "Replication is being shut down"), elapsed);
- }
-
- Microseconds waitTime = txn->getRemainingMaxTimeMicros();
- if (writeConcern.wTimeout != WriteConcernOptions::kNoTimeout) {
- waitTime =
- std::min<Microseconds>(Milliseconds{writeConcern.wTimeout} - elapsed, waitTime);
- }
-
- const bool waitForever = waitTime == Microseconds::max();
- if (waitForever) {
- condVar.wait(*lock);
- } else {
- condVar.wait_for(*lock, waitTime.toSystemDuration());
- }
- }
-
- Status status = _checkIfWriteConcernCanBeSatisfied_inlock(writeConcern);
- if (!status.isOK()) {
- return StatusAndDuration(status, Milliseconds(timer->millis()));
}
- return StatusAndDuration(Status::OK(), Milliseconds(timer->millis()));
+ return _checkIfWriteConcernCanBeSatisfied_inlock(writeConcern);
}
-ReplicationCoordinatorImpl::StepDownNonBlockingResult
-ReplicationCoordinatorImpl::stepDown_nonBlocking(OperationContext* txn,
- bool force,
- const Milliseconds& waitTime,
- const Milliseconds& stepdownTime,
- Status* result) {
- invariant(result);
+Status ReplicationCoordinatorImpl::stepDown(OperationContext* txn,
+ const bool force,
+ const Milliseconds& waitTime,
+ const Milliseconds& stepdownTime) {
const Date_t startTime = _replExecutor.now();
const Date_t stepDownUntil = startTime + stepdownTime;
@@ -1726,166 +1688,87 @@ ReplicationCoordinatorImpl::stepDown_nonBlocking(OperationContext* txn,
// Note this check is inherently racy - it's always possible for the node to
// stepdown from some other path before we acquire the global shared lock, but
// that's okay because we are resiliant to that happening in _stepDownContinue.
- *result = Status(ErrorCodes::NotMaster, "not primary so can't step down");
- return StepDownNonBlockingResult();
+ return {ErrorCodes::NotMaster, "not primary so can't step down"};
}
- auto globalReadLock = stdx::make_unique<Lock::GlobalLock>(
- txn->lockState(), MODE_S, Lock::GlobalLock::EnqueueOnly());
+ Lock::GlobalLock globalReadLock(txn->lockState(), MODE_S, Lock::GlobalLock::EnqueueOnly());
// We've requested the global shared lock which will stop new writes from coming in,
// but existing writes could take a long time to finish, so kill all user operations
// to help us get the global lock faster.
_externalState->killAllUserOperations(txn);
- globalReadLock->waitForLock(durationCount<Milliseconds>(stepdownTime));
-
- if (!globalReadLock->isLocked()) {
- *result = Status(ErrorCodes::ExceededTimeLimit,
- "Could not acquire the global shared lock within the amount of time "
- "specified that we should step down for");
- return StepDownNonBlockingResult();
- }
+ globalReadLock.waitForLock(durationCount<Milliseconds>(stepdownTime));
- StatusWith<ReplicationExecutor::EventHandle> finishedEvent = _replExecutor.makeEvent();
- if (finishedEvent.getStatus() == ErrorCodes::ShutdownInProgress) {
- *result = finishedEvent.getStatus();
- return StepDownNonBlockingResult();
+ if (!globalReadLock.isLocked()) {
+ return {ErrorCodes::ExceededTimeLimit,
+ "Could not acquire the global shared lock within the amount of time "
+ "specified that we should step down for"};
}
- fassert(26000, finishedEvent.getStatus());
- _stepDownContinue(finishedEvent.getValue(),
- txn,
- waitUntil,
- stepDownUntil,
- force,
- true, // restartHeartbeats
- result);
- auto signalStepDownWaiterInLock = [this](const CallbackArgs&) {
- LockGuard lk(_mutex);
- _signalStepDownWaiter_inlock();
- };
-
- _scheduleWorkAt(waitUntil, signalStepDownWaiterInLock);
- return std::make_pair(std::move(globalReadLock), finishedEvent.getValue());
-}
-
-Status ReplicationCoordinatorImpl::stepDown(OperationContext* txn,
- bool force,
- const Milliseconds& waitTime,
- const Milliseconds& stepdownTime) {
- Status result(ErrorCodes::InternalError, "didn't set status in _stepDownContinue");
- auto globalReadLockAndEventHandle =
- stepDown_nonBlocking(txn, force, waitTime, stepdownTime, &result);
- const auto& eventHandle = globalReadLockAndEventHandle.second;
- if (eventHandle.isValid()) {
- _replExecutor.waitForEvent(eventHandle);
+ try {
+ stdx::unique_lock<stdx::mutex> topoLock(_topoMutex);
+ bool restartHeartbeats = true;
+ txn->checkForInterrupt();
+ while (!_tryToStepDown(waitUntil, stepDownUntil, force)) {
+ if (restartHeartbeats) {
+ // We send out a fresh round of heartbeats because stepping down successfully
+ // without
+ // {force: true} is dependent on timely heartbeat data.
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _restartHeartbeats_inlock();
+ restartHeartbeats = false;
+ }
+ txn->waitForConditionOrInterruptUntil(
+ _stepDownWaiters, topoLock, std::min(stepDownUntil, waitUntil));
+ }
+ } catch (const DBException& ex) {
+ return ex.toStatus();
}
- return result;
+ return Status::OK();
}
void ReplicationCoordinatorImpl::_signalStepDownWaiter_inlock() {
- if (_stepDownWaiter) {
- _replExecutor.signalEvent(_stepDownWaiter);
- _stepDownWaiter = EventHandle();
- }
+ _stepDownWaiters.notify_all();
}
-void ReplicationCoordinatorImpl::_stepDownContinue(
- const ReplicationExecutor::EventHandle finishedEvent,
- OperationContext* txn,
- const Date_t waitUntil,
- const Date_t stepDownUntil,
- bool force,
- bool restartHeartbeats,
- Status* result) {
- LockGuard topoLock(_topoMutex);
-
- ScopeGuard allFinishedGuard =
- MakeGuard(stdx::bind(&ReplicationExecutor::signalEvent, &_replExecutor, finishedEvent));
-
- Status interruptedStatus = txn->checkForInterruptNoAssert();
- if (!interruptedStatus.isOK()) {
- *result = interruptedStatus;
- return;
- }
+bool ReplicationCoordinatorImpl::_tryToStepDown(const Date_t waitUntil,
+ const Date_t stepDownUntil,
+ const bool force) {
if (_topCoord->getRole() != TopologyCoordinator::Role::leader) {
- *result = Status(ErrorCodes::NotMaster,
- "Already stepped down from primary while processing step down "
- "request");
- return;
+ uasserted(ErrorCodes::NotMaster,
+ "Already stepped down from primary while processing step down request");
}
const Date_t now = _replExecutor.now();
if (now >= stepDownUntil) {
- *result = Status(ErrorCodes::ExceededTimeLimit,
- "By the time we were ready to step down, we were already past the "
- "time we were supposed to step down until");
- return;
- }
- bool forceNow = now >= waitUntil ? force : false;
- if (_topCoord->stepDown(stepDownUntil, forceNow, getMyLastAppliedOpTime())) {
- // Schedule work to (potentially) step back up once the stepdown period has ended.
- _scheduleWorkAt(stepDownUntil,
- stdx::bind(&ReplicationCoordinatorImpl::_handleTimePassing,
- this,
- stdx::placeholders::_1));
-
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- const PostMemberStateUpdateAction action =
- _updateMemberStateFromTopologyCoordinator_inlock();
- lk.unlock();
- _performPostMemberStateUpdateAction(action);
- *result = Status::OK();
- return;
- }
-
- // Step down failed. Keep waiting if we can, otherwise finish.
- if (now >= waitUntil) {
- *result = Status(ErrorCodes::ExceededTimeLimit,
- str::stream() << "No electable secondaries caught up as of "
- << dateToISOStringLocal(now)
- << ". Please use {force: true} to force node to step down.");
- return;
+ uasserted(ErrorCodes::ExceededTimeLimit,
+ "By the time we were ready to step down, we were already past the "
+ "time we were supposed to step down until");
}
- {
- LockGuard lk(_mutex);
- if (!_stepDownWaiter) {
- StatusWith<ReplicationExecutor::EventHandle> reschedEvent = _replExecutor.makeEvent();
- if (!reschedEvent.isOK()) {
- *result = reschedEvent.getStatus();
- return;
- }
- _stepDownWaiter = reschedEvent.getValue();
- }
- CBHStatus cbh =
- _replExecutor.onEvent(_stepDownWaiter,
- stdx::bind(&ReplicationCoordinatorImpl::_stepDownContinue,
- this,
- finishedEvent,
- txn,
- waitUntil,
- stepDownUntil,
- force,
- false, // restartHeartbeats
- result));
- if (!cbh.isOK()) {
- *result = cbh.getStatus();
- return;
+ const bool forceNow = now >= waitUntil ? force : false;
+ if (!_topCoord->stepDown(stepDownUntil, forceNow, getMyLastAppliedOpTime())) {
+ if (now >= waitUntil) {
+ uasserted(ErrorCodes::ExceededTimeLimit,
+ str::stream() << "No electable secondaries caught up as of "
+ << dateToISOStringLocal(now)
+ << ". Please use {force: true} to force node to step down.");
}
+ return false;
}
- allFinishedGuard.Dismiss();
- // We send out a fresh round of heartbeats because stepping down successfully without
- // {force: true} is dependent on timely heartbeat data.
- // This callback is invoked every time a heartbeat response is processed so restart heartbeats
- // only once.
- if (!restartHeartbeats) {
- return;
- }
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _restartHeartbeats_inlock();
+ const auto action = [&] {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _updateMemberStateFromTopologyCoordinator_inlock();
+ }();
+ _performPostMemberStateUpdateAction(action);
+
+ // Schedule work to (potentially) step back up once the stepdown period has ended.
+ _scheduleWorkAt(
+ stepDownUntil,
+ stdx::bind(&ReplicationCoordinatorImpl::_handleTimePassing, this, stdx::placeholders::_1));
+ return true;
}
void ReplicationCoordinatorImpl::_handleTimePassing(
@@ -3637,12 +3520,7 @@ void ReplicationCoordinatorImpl::waitUntilSnapshotCommitted(OperationContext* tx
stdx::unique_lock<stdx::mutex> lock(_mutex);
while (!_currentCommittedSnapshot || _currentCommittedSnapshot->name < untilSnapshot) {
- if (!txn->hasDeadline()) {
- _currentCommittedSnapshotCond.wait(lock);
- } else {
- _currentCommittedSnapshotCond.wait_until(lock, txn->getDeadline().toSystemTimePoint());
- }
- txn->checkForInterrupt();
+ txn->waitForConditionOrInterrupt(_currentCommittedSnapshotCond, lock);
}
}
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index d66dd431279..eb22e212e30 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -46,7 +46,6 @@
#include "mongo/db/repl/sync_source_resolver.h"
#include "mongo/db/repl/topology_coordinator.h"
#include "mongo/db/repl/update_position_args.h"
-#include "mongo/db/service_context.h"
#include "mongo/db/storage/snapshot_name.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/platform/unordered_map.h"
@@ -85,14 +84,10 @@ class StorageInterface;
class TopologyCoordinator;
class VoteRequester;
-class ReplicationCoordinatorImpl : public ReplicationCoordinator, public KillOpListenerInterface {
+class ReplicationCoordinatorImpl : public ReplicationCoordinator {
MONGO_DISALLOW_COPYING(ReplicationCoordinatorImpl);
public:
- // For testing only.
- using StepDownNonBlockingResult =
- std::pair<std::unique_ptr<mongo::Lock::GlobalLock>, ReplicationExecutor::EventHandle>;
-
// Takes ownership of the "externalState", "topCoord" and "network" objects.
ReplicationCoordinatorImpl(const ReplSettings& settings,
ReplicationCoordinatorExternalState* externalState,
@@ -134,18 +129,6 @@ public:
virtual void clearSyncSourceBlacklist() override;
- /*
- * Implementation of the KillOpListenerInterface interrupt method so that we can wake up
- * threads blocked in awaitReplication() when a killOp command comes in.
- */
- virtual void interrupt(unsigned opId);
-
- /*
- * Implementation of the KillOpListenerInterface interruptAll method so that we can wake up
- * threads blocked in awaitReplication() when we kill all operations.
- */
- virtual void interruptAll();
-
virtual ReplicationCoordinator::StatusAndDuration awaitReplication(
OperationContext* txn, const OpTime& opTime, const WriteConcernOptions& writeConcern);
@@ -373,22 +356,6 @@ public:
Status setLastDurableOptime_forTest(long long cfgVer, long long memberId, const OpTime& opTime);
/**
- * Non-blocking version of stepDown.
- * Returns a pair of global shared lock and event handle which are used to wait for the step
- * down operation to complete. The global shared lock prevents writes until the step down has
- * completed (or failed).
- * When the operation is complete (wait() returns), 'result' will be set to the
- * final status of the operation.
- * If the handle is invalid, step down failed before we could schedule the rest of
- * the step down processing and the error will be available immediately in 'result'.
- */
- StepDownNonBlockingResult stepDown_nonBlocking(OperationContext* txn,
- bool force,
- const Milliseconds& waitTime,
- const Milliseconds& stepdownTime,
- Status* result);
-
- /**
* Non-blocking version of setFollowerMode.
* Returns event handle that we can use to wait for the operation to complete.
* When the operation is complete (wait() returns), 'success' will be set to true
@@ -644,17 +611,14 @@ private:
void _handleTimePassing(const ReplicationExecutor::CallbackArgs& cbData);
/**
- * Helper method for _awaitReplication that takes an already locked unique_lock and a
- * Timer for timing the operation which has been counting since before the lock was
- * acquired.
+ * Helper method for _awaitReplication that takes an already locked unique_lock, but leaves
+ * operation timing to the caller.
*/
- ReplicationCoordinator::StatusAndDuration _awaitReplication_inlock(
- const Timer* timer,
- stdx::unique_lock<stdx::mutex>* lock,
- OperationContext* txn,
- const OpTime& opTime,
- SnapshotName minSnapshot,
- const WriteConcernOptions& writeConcern);
+ Status _awaitReplication_inlock(stdx::unique_lock<stdx::mutex>* lock,
+ OperationContext* txn,
+ const OpTime& opTime,
+ SnapshotName minSnapshot,
+ const WriteConcernOptions& writeConcern);
/**
* Returns true if the given writeConcern is satisfied up to "optime" or is unsatisfiable.
@@ -690,17 +654,11 @@ private:
void _signalStepDownWaiter_inlock();
/**
- * Helper for stepDown run within a ReplicationExecutor callback. This method assumes
- * it is running within a global shared lock, and thus that no writes are going on at the
- * same time.
+ * Non-blocking helper method for the stepDown method, that represents executing
+ * one attempt to step down. See implementation of this method and stepDown for
+ * details.
*/
- void _stepDownContinue(const ReplicationExecutor::EventHandle finishedEvent,
- OperationContext* txn,
- Date_t waitUntil,
- Date_t stepdownUntil,
- bool force,
- bool restartHeartbeats,
- Status* result);
+ bool _tryToStepDown(Date_t waitUntil, Date_t stepdownUntil, bool force);
OID _getMyRID_inlock() const;
@@ -1280,8 +1238,8 @@ private:
// This member's index position in the current config.
int _selfIndex; // (MX)
- // Event handle that should be signaled whenever new heartbeat data comes in.
- ReplicationExecutor::EventHandle _stepDownWaiter; // (M)
+ // Condition to signal when new heartbeat data comes in.
+ stdx::condition_variable _stepDownWaiters; // (X)
// State for conducting an election of this node.
// the presence of a non-null _freshnessChecker pointer indicates that an election is
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
index 8679967fd90..68584255cfb 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
@@ -210,11 +210,8 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
}
}
- {
- LockGuard lk(_mutex);
- // Wake the stepdown waiter when our updated OpTime allows it to finish stepping down.
- _signalStepDownWaiter_inlock();
- }
+ // Wake the stepdown waiter when our updated OpTime allows it to finish stepping down.
+ _signalStepDownWaiter_inlock();
_scheduleHeartbeatToTarget(
target, targetIndex, std::max(now, action.getNextHeartbeatStartDate()));
diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
index 659096ed571..11966b5bf94 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
@@ -65,6 +65,7 @@
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/log.h"
+#include "mongo/util/scopeguard.h"
#include "mongo/util/time_support.h"
#include "mongo/util/timer.h"
@@ -115,6 +116,14 @@ void runSingleNodeElection(ServiceContext::UniqueOperationContext txn,
replCoord->signalDrainComplete(txn.get());
}
+/**
+ * Helper that kills an operation, taking the necessary locks.
+ */
+void killOperation(OperationContext* txn) {
+ stdx::lock_guard<Client> lkClient(*txn->getClient());
+ txn->getServiceContext()->killOperation(txn);
+}
+
TEST_F(ReplCoordTest, NodeEntersStartup2StateWhenStartingUpWithValidLocalConfig) {
assertStartSuccess(BSON("_id"
<< "mySet"
@@ -1213,7 +1222,7 @@ TEST_F(ReplCoordTest, NodeReturnsWriteConcernFailedWhenAWriteConcernTimesOutBefo
OpTimeWithTermZero time2(100, 2);
WriteConcernOptions writeConcern;
- writeConcern.wTimeout = 50;
+ writeConcern.wDeadline = getNet()->now() + Milliseconds(50);
writeConcern.wNumNodes = 2;
// 2 nodes waiting for time2
@@ -1223,6 +1232,11 @@ TEST_F(ReplCoordTest, NodeReturnsWriteConcernFailedWhenAWriteConcernTimesOutBefo
getReplCoord()->setMyLastAppliedOpTime(time2);
getReplCoord()->setMyLastDurableOpTime(time2);
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 1, time1));
+ {
+ NetworkInterfaceMock::InNetworkGuard inNet(getNet());
+ getNet()->runUntil(writeConcern.wDeadline);
+ ASSERT_EQUALS(writeConcern.wDeadline, getNet()->now());
+ }
ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult();
ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
awaiter.reset();
@@ -1319,7 +1333,7 @@ TEST_F(ReplCoordTest, NodeReturnsNotMasterWhenSteppingDownBeforeSatisfyingAWrite
awaiter.start();
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 1, time1));
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 2, time1));
- getReplCoord()->stepDown(txn.get(), true, Milliseconds(0), Milliseconds(1000));
+ ASSERT_OK(getReplCoord()->stepDown(txn.get(), true, Milliseconds(0), Milliseconds(1000)));
ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult();
ASSERT_EQUALS(ErrorCodes::PrimarySteppedDown, statusAndDur.status);
awaiter.reset();
@@ -1362,13 +1376,7 @@ TEST_F(ReplCoordTest,
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 1, time1));
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 2, time1));
- OperationContext* txn = awaiter.getOperationContext();
- auto opID = txn->getOpID();
- {
- stdx::lock_guard<Client> lk(*txn->getClient());
- txn->markKilled(ErrorCodes::Interrupted);
- }
- getReplCoord()->interrupt(opID);
+ killOperation(awaiter.getOperationContext());
ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult();
ASSERT_EQUALS(ErrorCodes::Interrupted, statusAndDur.status);
awaiter.reset();
@@ -1376,6 +1384,33 @@ TEST_F(ReplCoordTest,
class StepDownTest : public ReplCoordTest {
protected:
+ struct SharedClientAndOperation {
+ static SharedClientAndOperation make(ServiceContext* serviceContext) {
+ SharedClientAndOperation result;
+ result.client = serviceContext->makeClient("StepDownThread");
+ result.txn = result.client->makeOperationContext();
+ return result;
+ }
+ std::shared_ptr<Client> client;
+ std::shared_ptr<OperationContext> txn;
+ };
+
+ std::pair<SharedClientAndOperation, stdx::future<boost::optional<Status>>> stepDown_nonBlocking(
+ bool force, Milliseconds waitTime, Milliseconds stepDownTime) {
+ using PromisedClientAndOperation = stdx::promise<SharedClientAndOperation>;
+ auto task = stdx::packaged_task<boost::optional<Status>(PromisedClientAndOperation)>(
+ [=](PromisedClientAndOperation operationPromise) -> boost::optional<Status> {
+ auto result = SharedClientAndOperation::make(getServiceContext());
+ operationPromise.set_value(result);
+ return getReplCoord()->stepDown(result.txn.get(), force, waitTime, stepDownTime);
+ });
+ auto result = task.get_future();
+ PromisedClientAndOperation operationPromise;
+ auto operationFuture = operationPromise.get_future();
+ stdx::thread(std::move(task), std::move(operationPromise)).detach();
+ return std::make_pair(operationFuture.get(), std::move(result));
+ }
+
OID myRid;
OID rid2;
OID rid3;
@@ -1700,25 +1735,16 @@ TEST_F(StepDownTest,
simulateSuccessfulV1Election();
- const auto txn = makeOperationContext();
-
-
// Step down where the secondary actually has to catch up before the stepDown can succeed.
// On entering the network, _stepDownContinue should cancel the heartbeats scheduled for
// T + 2 seconds and send out a new round of heartbeats immediately.
// This makes it unnecessary to advance the clock after entering the network to process
// the heartbeat requests.
- Status result(ErrorCodes::InternalError, "not mutated");
- auto globalReadLockAndEventHandle = repl->stepDown_nonBlocking(
- txn.get(), false, Milliseconds(10000), Milliseconds(60000), &result);
- const auto& eventHandle = globalReadLockAndEventHandle.second;
- ASSERT_TRUE(eventHandle);
- ASSERT_TRUE(txn->lockState()->isReadLocked());
+ auto result = stepDown_nonBlocking(false, Seconds(10), Seconds(60));
// Make a secondary actually catch up
enterNetwork();
getNet()->runUntil(getNet()->now() + Milliseconds(1000));
- ASSERT(getNet()->hasReadyRequests());
NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
RemoteCommandRequest request = noi->getRequest();
log() << request.target.toString() << " processing " << request.cmdObj;
@@ -1743,8 +1769,7 @@ TEST_F(StepDownTest,
getNet()->runReadyNetworkOperations();
exitNetwork();
- getReplExec()->waitForEvent(eventHandle);
- ASSERT_OK(result);
+ ASSERT_OK(*result.second.get());
ASSERT_TRUE(repl->getMemberState().secondary());
}
@@ -1761,25 +1786,16 @@ TEST_F(StepDownTest,
simulateSuccessfulV1Election();
- const auto txn = makeOperationContext();
-
-
// Step down where the secondary actually has to catch up before the stepDown can succeed.
// On entering the network, _stepDownContinue should cancel the heartbeats scheduled for
// T + 2 seconds and send out a new round of heartbeats immediately.
// This makes it unnecessary to advance the clock after entering the network to process
// the heartbeat requests.
- Status result(ErrorCodes::InternalError, "not mutated");
- auto globalReadLockAndEventHandle = repl->stepDown_nonBlocking(
- txn.get(), false, Milliseconds(10000), Milliseconds(60000), &result);
- const auto& eventHandle = globalReadLockAndEventHandle.second;
- ASSERT_TRUE(eventHandle);
- ASSERT_TRUE(txn->lockState()->isReadLocked());
+ auto result = stepDown_nonBlocking(false, Seconds(10), Seconds(60));
// Secondary has not caught up on first round of heartbeats.
enterNetwork();
getNet()->runUntil(getNet()->now() + Milliseconds(1000));
- ASSERT(getNet()->hasReadyRequests());
NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
RemoteCommandRequest request = noi->getRequest();
log() << "HB1: " << request.target.toString() << " processing " << request.cmdObj;
@@ -1808,7 +1824,6 @@ TEST_F(StepDownTest,
auto until = getNet()->now() + heartbeatInterval;
getNet()->runUntil(until);
ASSERT_EQUALS(until, getNet()->now());
- ASSERT(getNet()->hasReadyRequests());
noi = getNet()->getNextReadyRequest();
request = noi->getRequest();
log() << "HB2: " << request.target.toString() << " processing " << request.cmdObj;
@@ -1830,8 +1845,7 @@ TEST_F(StepDownTest,
getNet()->runReadyNetworkOperations();
exitNetwork();
- getReplExec()->waitForEvent(eventHandle);
- ASSERT_OK(result);
+ ASSERT_OK(*result.second.get());
ASSERT_TRUE(repl->getMemberState().secondary());
}
@@ -1847,28 +1861,12 @@ TEST_F(StepDownTest, NodeReturnsInterruptedWhenInterruptedDuringStepDown) {
simulateSuccessfulV1Election();
- const auto txn = makeOperationContext();
-
- const unsigned int opID = txn->getOpID();
-
ASSERT_TRUE(repl->getMemberState().primary());
// stepDown where the secondary actually has to catch up before the stepDown can succeed.
- Status result(ErrorCodes::InternalError, "not mutated");
- auto globalReadLockAndEventHandle = repl->stepDown_nonBlocking(
- txn.get(), false, Milliseconds(10000), Milliseconds(60000), &result);
- const auto& eventHandle = globalReadLockAndEventHandle.second;
- ASSERT_TRUE(eventHandle);
- ASSERT_TRUE(txn->lockState()->isReadLocked());
-
- {
- stdx::lock_guard<Client> lk(*(txn->getClient()));
- txn->markKilled(ErrorCodes::Interrupted);
- }
- getReplCoord()->interrupt(opID);
-
- getReplExec()->waitForEvent(eventHandle);
- ASSERT_EQUALS(ErrorCodes::Interrupted, result);
+ auto result = stepDown_nonBlocking(false, Seconds(10), Seconds(60));
+ killOperation(result.first.txn.get());
+ ASSERT_EQUALS(ErrorCodes::Interrupted, *result.second.get());
ASSERT_TRUE(repl->getMemberState().primary());
}
@@ -3416,12 +3414,8 @@ TEST_F(ReplCoordTest, NodeReturnsInterruptedWhenWaitingUntilAnOpTimeIsInterrupte
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(10, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(10, 0));
- auto txn = makeOperationContext();
-
- {
- stdx::lock_guard<Client> lk(*(txn->getClient()));
- txn->markKilled(ErrorCodes::Interrupted);
- }
+ const auto txn = makeOperationContext();
+ killOperation(txn.get());
auto status = getReplCoord()->waitUntilOpTimeForRead(
txn.get(), ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern));
@@ -3557,12 +3551,7 @@ TEST_F(ReplCoordTest, ReadAfterCommittedInterrupted) {
getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(10, 0), 0));
getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(10, 0), 0));
-
- {
- stdx::lock_guard<Client> lk(*(txn->getClient()));
- txn->markKilled(ErrorCodes::Interrupted);
- }
-
+ killOperation(txn.get());
auto status = getReplCoord()->waitUntilOpTimeForRead(
txn.get(),
ReadConcernArgs(OpTime(Timestamp(50, 0), 0), ReadConcernLevel::kMajorityReadConcern));
diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
index c630ddca26d..d3425170736 100644
--- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
+++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
@@ -129,6 +129,10 @@ void ReplCoordTest::init() {
_replExec.get(),
seed,
&_durablityLambda));
+ auto service = getGlobalServiceContext();
+ service->setFastClockSource(stdx::make_unique<executor::NetworkInterfaceMockClockSource>(_net));
+ service->setPreciseClockSource(
+ stdx::make_unique<executor::NetworkInterfaceMockClockSource>(_net));
}
void ReplCoordTest::init(const ReplSettings& settings) {
diff --git a/src/mongo/db/write_concern_options.h b/src/mongo/db/write_concern_options.h
index 85f9e0e0390..d66be64fdfb 100644
--- a/src/mongo/db/write_concern_options.h
+++ b/src/mongo/db/write_concern_options.h
@@ -105,6 +105,9 @@ public:
// Timeout in milliseconds.
int wTimeout;
+ // Deadline. If this is set to something other than Date_t::max(), this takes precedence over
+ // wTimeout.
+ Date_t wDeadline = Date_t::max();
// True if the default write concern was used.
bool usedDefault = false;
diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp
index 8cf36afc153..1eed68daea5 100644
--- a/src/mongo/executor/network_interface_mock.cpp
+++ b/src/mongo/executor/network_interface_mock.cpp
@@ -664,5 +664,10 @@ NetworkInterfaceMock::InNetworkGuard::~InNetworkGuard() {
_net->exitNetwork();
}
+NetworkInterfaceMockClockSource::NetworkInterfaceMockClockSource(NetworkInterfaceMock* net)
+ : _net(net) {
+ _tracksSystemClock = false;
+}
+
} // namespace executor
} // namespace mongo
diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h
index f75330e7acf..0ba054f11fe 100644
--- a/src/mongo/executor/network_interface_mock.h
+++ b/src/mongo/executor/network_interface_mock.h
@@ -41,6 +41,7 @@
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/unordered_map.h"
#include "mongo/stdx/unordered_set.h"
+#include "mongo/util/clock_source.h"
#include "mongo/util/time_support.h"
namespace mongo {
@@ -519,5 +520,23 @@ private:
bool _callExitNetwork = true;
};
+class NetworkInterfaceMockClockSource : public ClockSource {
+public:
+ explicit NetworkInterfaceMockClockSource(NetworkInterfaceMock* net);
+
+ Milliseconds getPrecision() override {
+ return Milliseconds{1};
+ }
+ Date_t now() override {
+ return _net->now();
+ }
+ Status setAlarm(Date_t when, stdx::function<void()> action) override {
+ return _net->setAlarm(when, action);
+ }
+
+private:
+ NetworkInterfaceMock* _net;
+};
+
} // namespace executor
} // namespace mongo