diff options
author | Andy Schwerin <schwerin@mongodb.com> | 2016-10-13 10:18:19 -0400 |
---|---|---|
committer | Andy Schwerin <schwerin@mongodb.com> | 2016-10-13 10:18:19 -0400 |
commit | 9228a2b401b4af0adfa53a61053fba3a7df4f75c (patch) | |
tree | b9a66d8073dfa532c87dd04f2e48d90d75e4097e /src | |
parent | 8794dd4bdcab131a4ecf0963505e7fb2a34bf5e6 (diff) | |
download | mongo-9228a2b401b4af0adfa53a61053fba3a7df4f75c.tar.gz |
SERVER-26305 Use interruptible condition variables in ReplicationCoordinatorImpl instead of KillOpListener
While this change also improves the readability of _awaitReplication_inlock and
stepDown, it resovles SERVER-26305 by breaking a deadlock cycle caused by the
fact that KillOpListener methods get run under a mutex in ServiceContext.
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/db.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/operation_context.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 334 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.h | 70 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_test.cpp | 119 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_test_fixture.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/write_concern_options.h | 3 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_mock.cpp | 5 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_mock.h | 19 |
10 files changed, 221 insertions, 357 deletions
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index 1b67ab16783..a50cdb92faf 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -985,7 +985,6 @@ MONGO_INITIALIZER_WITH_PREREQUISITES(CreateReplicationManager, new repl::TopologyCoordinatorImpl(topoCoordOptions), storageInterface, static_cast<int64_t>(curTimeMillis64())); - serviceContext->registerKillOpListener(replCoord.get()); repl::ReplicationCoordinator::set(serviceContext, std::move(replCoord)); repl::setOplogCollectionName(); return Status::OK(); diff --git a/src/mongo/db/operation_context.cpp b/src/mongo/db/operation_context.cpp index 2a5c8814bf3..8151fe97dfa 100644 --- a/src/mongo/db/operation_context.cpp +++ b/src/mongo/db/operation_context.cpp @@ -248,16 +248,28 @@ static NOINLINE_DECL stdx::cv_status cvWaitUntilWithClockSource(ClockSource* clo auto alarmInfo = std::make_shared<AlarmInfo>(); alarmInfo->waitCV = &cv; alarmInfo->waitMutex = m.mutex(); - invariantOK(clockSource->setAlarm(deadline, [alarmInfo] { + const auto waiterThreadId = stdx::this_thread::get_id(); + bool invokedAlarmInline = false; + invariantOK(clockSource->setAlarm(deadline, [alarmInfo, waiterThreadId, &invokedAlarmInline] { stdx::lock_guard<stdx::mutex> controlLk(alarmInfo->controlMutex); alarmInfo->cvWaitResult = stdx::cv_status::timeout; if (!alarmInfo->waitMutex) { return; } + if (stdx::this_thread::get_id() == waiterThreadId) { + // In NetworkInterfaceMock, setAlarm may invoke its callback immediately if the deadline + // has expired, so we detect that case and avoid self-deadlock by returning early, here. + // It is safe to set invokedAlarmInline without synchronization in this case, because it + // is exactly the case where the same thread is writing and consulting the value. + invokedAlarmInline = true; + return; + } stdx::lock_guard<stdx::mutex> waitLk(*alarmInfo->waitMutex); alarmInfo->waitCV->notify_all(); })); - cv.wait(m); + if (!invokedAlarmInline) { + cv.wait(m); + } m.unlock(); stdx::lock_guard<stdx::mutex> controlLk(alarmInfo->controlMutex); m.lock(); diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 028d02c55b4..dd8a57cd60f 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -723,9 +723,15 @@ void ReplicationCoordinatorImpl::shutdown(OperationContext* txn) { fassert(18823, _rsConfigState != kConfigStartingUp); _replicationWaiterList.signalAndRemoveAll_inlock(); _opTimeWaiterList.signalAndRemoveAll_inlock(); + _currentCommittedSnapshotCond.notify_all(); _dr.swap(drCopy); } + { + stdx::lock_guard<stdx::mutex> topoLock(_topoMutex); + _signalStepDownWaiter_inlock(); + } + // joining the replication executor is blocking so it must be run outside of the mutex if (drCopy) { drCopy->shutdown(txn); @@ -1265,7 +1271,6 @@ Status ReplicationCoordinatorImpl::waitUntilOpTimeForRead(OperationContext* txn, if (!waitStatus.isOK()) { return waitStatus; } - LOG(3) << "Got notified of new snapshot: " << _currentCommittedSnapshot->toString(); continue; } @@ -1464,27 +1469,6 @@ Status ReplicationCoordinatorImpl::_setLastOptime_inlock(const UpdatePositionArg return Status::OK(); } -void ReplicationCoordinatorImpl::interrupt(unsigned opId) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - // Wake ops waiting for a new committed snapshot. - _currentCommittedSnapshotCond.notify_all(); - - auto hasSameOpID = [opId](WaiterInfo* waiter) { return waiter->opID == opId; }; - _replicationWaiterList.signalAndRemoveIf_inlock(hasSameOpID); - _opTimeWaiterList.signalAndRemoveIf_inlock(hasSameOpID); - _signalStepDownWaiter_inlock(); -} - -void ReplicationCoordinatorImpl::interruptAll() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - // Wake ops waiting for a new committed snapshot. - _currentCommittedSnapshotCond.notify_all(); - - _replicationWaiterList.signalAndRemoveAll_inlock(); - _opTimeWaiterList.signalAndRemoveAll_inlock(); - _signalStepDownWaiter_inlock(); -} - bool ReplicationCoordinatorImpl::_doneWaitingForReplication_inlock( const OpTime& opTime, SnapshotName minSnapshot, const WriteConcernOptions& writeConcern) { // The syncMode cannot be unset. @@ -1584,8 +1568,9 @@ ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::awaitRepli Timer timer; WriteConcernOptions fixedWriteConcern = populateUnsetWriteConcernOptionsSyncMode(writeConcern); stdx::unique_lock<stdx::mutex> lock(_mutex); - return _awaitReplication_inlock( - &timer, &lock, txn, opTime, SnapshotName::min(), fixedWriteConcern); + auto status = + _awaitReplication_inlock(&lock, txn, opTime, SnapshotName::min(), fixedWriteConcern); + return {std::move(status), duration_cast<Milliseconds>(timer.elapsed())}; } ReplicationCoordinator::StatusAndDuration @@ -1595,81 +1580,86 @@ ReplicationCoordinatorImpl::awaitReplicationOfLastOpForClient( WriteConcernOptions fixedWriteConcern = populateUnsetWriteConcernOptionsSyncMode(writeConcern); stdx::unique_lock<stdx::mutex> lock(_mutex); const auto& clientInfo = ReplClientInfo::forClient(txn->getClient()); - return _awaitReplication_inlock(&timer, - &lock, - txn, - clientInfo.getLastOp(), - clientInfo.getLastSnapshot(), - fixedWriteConcern); + auto status = _awaitReplication_inlock( + &lock, txn, clientInfo.getLastOp(), clientInfo.getLastSnapshot(), fixedWriteConcern); + return {std::move(status), duration_cast<Milliseconds>(timer.elapsed())}; } -ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::_awaitReplication_inlock( - const Timer* timer, +Status ReplicationCoordinatorImpl::_awaitReplication_inlock( stdx::unique_lock<stdx::mutex>* lock, OperationContext* txn, const OpTime& opTime, SnapshotName minSnapshot, const WriteConcernOptions& writeConcern) { + // We should never wait for replication if we are holding any locks, because this can // potentially block for long time while doing network activity. if (txn->lockState()->isLocked()) { - return StatusAndDuration({ErrorCodes::IllegalOperation, - "Waiting for replication not allowed while holding a lock"}, - Milliseconds(timer->millis())); + return {ErrorCodes::IllegalOperation, + "Waiting for replication not allowed while holding a lock"}; } const Mode replMode = getReplicationMode(); if (replMode == modeNone) { // no replication check needed (validated above) - return StatusAndDuration(Status::OK(), Milliseconds(timer->millis())); + return Status::OK(); } if (replMode == modeMasterSlave && writeConcern.wMode == WriteConcernOptions::kMajority) { // with master/slave, majority is equivalent to w=1 - return StatusAndDuration(Status::OK(), Milliseconds(timer->millis())); + return Status::OK(); } if (opTime.isNull() && minSnapshot == SnapshotName::min()) { // If waiting for the empty optime, always say it's been replicated. - return StatusAndDuration(Status::OK(), Milliseconds(timer->millis())); + return Status::OK(); } if (replMode == modeReplSet && !_memberState.primary()) { - return StatusAndDuration(Status(ErrorCodes::PrimarySteppedDown, - "Primary stepped down while waiting for replication"), - Milliseconds(timer->millis())); + return {ErrorCodes::PrimarySteppedDown, + "Primary stepped down while waiting for replication"}; } if (writeConcern.wMode.empty()) { if (writeConcern.wNumNodes < 1) { - return StatusAndDuration(Status::OK(), Milliseconds(timer->millis())); + return Status::OK(); } else if (writeConcern.wNumNodes == 1 && _getMyLastAppliedOpTime_inlock() >= opTime) { - return StatusAndDuration(Status::OK(), Milliseconds(timer->millis())); + return Status::OK(); } } + auto clockSource = txn->getServiceContext()->getFastClockSource(); + const auto wTimeoutDate = [&]() -> const Date_t { + if (writeConcern.wDeadline != Date_t::max()) { + return writeConcern.wDeadline; + } + if (writeConcern.wTimeout == WriteConcernOptions::kNoTimeout) { + return Date_t::max(); + } + return clockSource->now() + clockSource->getPrecision() + + Milliseconds{writeConcern.wTimeout}; + }(); + // Must hold _mutex before constructing waitInfo as it will modify _replicationWaiterList stdx::condition_variable condVar; WaiterInfoGuard waitInfo( &_replicationWaiterList, txn->getOpID(), opTime, &writeConcern, &condVar); while (!_doneWaitingForReplication_inlock(opTime, minSnapshot, writeConcern)) { - const Milliseconds elapsed{timer->millis()}; - - Status interruptedStatus = txn->checkForInterruptNoAssert(); - if (!interruptedStatus.isOK()) { - return StatusAndDuration(interruptedStatus, elapsed); + if (replMode == modeReplSet && !_getMemberState_inlock().primary()) { + return {ErrorCodes::PrimarySteppedDown, + "Not primary anymore while waiting for replication - primary stepped down"}; } - if (replMode == modeReplSet && !_getMemberState_inlock().primary()) { - return StatusAndDuration( - Status(ErrorCodes::PrimarySteppedDown, - "Not primary anymore while waiting for replication - primary stepped down"), - elapsed); + if (_inShutdown) { + return {ErrorCodes::ShutdownInProgress, "Replication is being shut down"}; } - if (writeConcern.wTimeout != WriteConcernOptions::kNoTimeout && - elapsed > Milliseconds{writeConcern.wTimeout}) { + auto status = txn->waitForConditionOrInterruptNoAssertUntil(condVar, *lock, wTimeoutDate); + if (!status.isOK()) { + return status.getStatus(); + } + if (status.getValue() == stdx::cv_status::timeout) { if (Command::testCommandsEnabled) { // log state of replica set on timeout to help with diagnosis. BSONObjBuilder progress; @@ -1678,45 +1668,17 @@ ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::_awaitRepl << ", waitInfo:" << waitInfo.waiter.toBSON() << ", progress: " << progress.done(); } - return StatusAndDuration( - Status(ErrorCodes::WriteConcernFailed, "waiting for replication timed out"), - elapsed); + return {ErrorCodes::WriteConcernFailed, "waiting for replication timed out"}; } - - if (_inShutdown) { - return StatusAndDuration( - Status(ErrorCodes::ShutdownInProgress, "Replication is being shut down"), elapsed); - } - - Microseconds waitTime = txn->getRemainingMaxTimeMicros(); - if (writeConcern.wTimeout != WriteConcernOptions::kNoTimeout) { - waitTime = - std::min<Microseconds>(Milliseconds{writeConcern.wTimeout} - elapsed, waitTime); - } - - const bool waitForever = waitTime == Microseconds::max(); - if (waitForever) { - condVar.wait(*lock); - } else { - condVar.wait_for(*lock, waitTime.toSystemDuration()); - } - } - - Status status = _checkIfWriteConcernCanBeSatisfied_inlock(writeConcern); - if (!status.isOK()) { - return StatusAndDuration(status, Milliseconds(timer->millis())); } - return StatusAndDuration(Status::OK(), Milliseconds(timer->millis())); + return _checkIfWriteConcernCanBeSatisfied_inlock(writeConcern); } -ReplicationCoordinatorImpl::StepDownNonBlockingResult -ReplicationCoordinatorImpl::stepDown_nonBlocking(OperationContext* txn, - bool force, - const Milliseconds& waitTime, - const Milliseconds& stepdownTime, - Status* result) { - invariant(result); +Status ReplicationCoordinatorImpl::stepDown(OperationContext* txn, + const bool force, + const Milliseconds& waitTime, + const Milliseconds& stepdownTime) { const Date_t startTime = _replExecutor.now(); const Date_t stepDownUntil = startTime + stepdownTime; @@ -1726,166 +1688,87 @@ ReplicationCoordinatorImpl::stepDown_nonBlocking(OperationContext* txn, // Note this check is inherently racy - it's always possible for the node to // stepdown from some other path before we acquire the global shared lock, but // that's okay because we are resiliant to that happening in _stepDownContinue. - *result = Status(ErrorCodes::NotMaster, "not primary so can't step down"); - return StepDownNonBlockingResult(); + return {ErrorCodes::NotMaster, "not primary so can't step down"}; } - auto globalReadLock = stdx::make_unique<Lock::GlobalLock>( - txn->lockState(), MODE_S, Lock::GlobalLock::EnqueueOnly()); + Lock::GlobalLock globalReadLock(txn->lockState(), MODE_S, Lock::GlobalLock::EnqueueOnly()); // We've requested the global shared lock which will stop new writes from coming in, // but existing writes could take a long time to finish, so kill all user operations // to help us get the global lock faster. _externalState->killAllUserOperations(txn); - globalReadLock->waitForLock(durationCount<Milliseconds>(stepdownTime)); - - if (!globalReadLock->isLocked()) { - *result = Status(ErrorCodes::ExceededTimeLimit, - "Could not acquire the global shared lock within the amount of time " - "specified that we should step down for"); - return StepDownNonBlockingResult(); - } + globalReadLock.waitForLock(durationCount<Milliseconds>(stepdownTime)); - StatusWith<ReplicationExecutor::EventHandle> finishedEvent = _replExecutor.makeEvent(); - if (finishedEvent.getStatus() == ErrorCodes::ShutdownInProgress) { - *result = finishedEvent.getStatus(); - return StepDownNonBlockingResult(); + if (!globalReadLock.isLocked()) { + return {ErrorCodes::ExceededTimeLimit, + "Could not acquire the global shared lock within the amount of time " + "specified that we should step down for"}; } - fassert(26000, finishedEvent.getStatus()); - _stepDownContinue(finishedEvent.getValue(), - txn, - waitUntil, - stepDownUntil, - force, - true, // restartHeartbeats - result); - auto signalStepDownWaiterInLock = [this](const CallbackArgs&) { - LockGuard lk(_mutex); - _signalStepDownWaiter_inlock(); - }; - - _scheduleWorkAt(waitUntil, signalStepDownWaiterInLock); - return std::make_pair(std::move(globalReadLock), finishedEvent.getValue()); -} - -Status ReplicationCoordinatorImpl::stepDown(OperationContext* txn, - bool force, - const Milliseconds& waitTime, - const Milliseconds& stepdownTime) { - Status result(ErrorCodes::InternalError, "didn't set status in _stepDownContinue"); - auto globalReadLockAndEventHandle = - stepDown_nonBlocking(txn, force, waitTime, stepdownTime, &result); - const auto& eventHandle = globalReadLockAndEventHandle.second; - if (eventHandle.isValid()) { - _replExecutor.waitForEvent(eventHandle); + try { + stdx::unique_lock<stdx::mutex> topoLock(_topoMutex); + bool restartHeartbeats = true; + txn->checkForInterrupt(); + while (!_tryToStepDown(waitUntil, stepDownUntil, force)) { + if (restartHeartbeats) { + // We send out a fresh round of heartbeats because stepping down successfully + // without + // {force: true} is dependent on timely heartbeat data. + stdx::lock_guard<stdx::mutex> lk(_mutex); + _restartHeartbeats_inlock(); + restartHeartbeats = false; + } + txn->waitForConditionOrInterruptUntil( + _stepDownWaiters, topoLock, std::min(stepDownUntil, waitUntil)); + } + } catch (const DBException& ex) { + return ex.toStatus(); } - return result; + return Status::OK(); } void ReplicationCoordinatorImpl::_signalStepDownWaiter_inlock() { - if (_stepDownWaiter) { - _replExecutor.signalEvent(_stepDownWaiter); - _stepDownWaiter = EventHandle(); - } + _stepDownWaiters.notify_all(); } -void ReplicationCoordinatorImpl::_stepDownContinue( - const ReplicationExecutor::EventHandle finishedEvent, - OperationContext* txn, - const Date_t waitUntil, - const Date_t stepDownUntil, - bool force, - bool restartHeartbeats, - Status* result) { - LockGuard topoLock(_topoMutex); - - ScopeGuard allFinishedGuard = - MakeGuard(stdx::bind(&ReplicationExecutor::signalEvent, &_replExecutor, finishedEvent)); - - Status interruptedStatus = txn->checkForInterruptNoAssert(); - if (!interruptedStatus.isOK()) { - *result = interruptedStatus; - return; - } +bool ReplicationCoordinatorImpl::_tryToStepDown(const Date_t waitUntil, + const Date_t stepDownUntil, + const bool force) { if (_topCoord->getRole() != TopologyCoordinator::Role::leader) { - *result = Status(ErrorCodes::NotMaster, - "Already stepped down from primary while processing step down " - "request"); - return; + uasserted(ErrorCodes::NotMaster, + "Already stepped down from primary while processing step down request"); } const Date_t now = _replExecutor.now(); if (now >= stepDownUntil) { - *result = Status(ErrorCodes::ExceededTimeLimit, - "By the time we were ready to step down, we were already past the " - "time we were supposed to step down until"); - return; - } - bool forceNow = now >= waitUntil ? force : false; - if (_topCoord->stepDown(stepDownUntil, forceNow, getMyLastAppliedOpTime())) { - // Schedule work to (potentially) step back up once the stepdown period has ended. - _scheduleWorkAt(stepDownUntil, - stdx::bind(&ReplicationCoordinatorImpl::_handleTimePassing, - this, - stdx::placeholders::_1)); - - stdx::unique_lock<stdx::mutex> lk(_mutex); - const PostMemberStateUpdateAction action = - _updateMemberStateFromTopologyCoordinator_inlock(); - lk.unlock(); - _performPostMemberStateUpdateAction(action); - *result = Status::OK(); - return; - } - - // Step down failed. Keep waiting if we can, otherwise finish. - if (now >= waitUntil) { - *result = Status(ErrorCodes::ExceededTimeLimit, - str::stream() << "No electable secondaries caught up as of " - << dateToISOStringLocal(now) - << ". Please use {force: true} to force node to step down."); - return; + uasserted(ErrorCodes::ExceededTimeLimit, + "By the time we were ready to step down, we were already past the " + "time we were supposed to step down until"); } - { - LockGuard lk(_mutex); - if (!_stepDownWaiter) { - StatusWith<ReplicationExecutor::EventHandle> reschedEvent = _replExecutor.makeEvent(); - if (!reschedEvent.isOK()) { - *result = reschedEvent.getStatus(); - return; - } - _stepDownWaiter = reschedEvent.getValue(); - } - CBHStatus cbh = - _replExecutor.onEvent(_stepDownWaiter, - stdx::bind(&ReplicationCoordinatorImpl::_stepDownContinue, - this, - finishedEvent, - txn, - waitUntil, - stepDownUntil, - force, - false, // restartHeartbeats - result)); - if (!cbh.isOK()) { - *result = cbh.getStatus(); - return; + const bool forceNow = now >= waitUntil ? force : false; + if (!_topCoord->stepDown(stepDownUntil, forceNow, getMyLastAppliedOpTime())) { + if (now >= waitUntil) { + uasserted(ErrorCodes::ExceededTimeLimit, + str::stream() << "No electable secondaries caught up as of " + << dateToISOStringLocal(now) + << ". Please use {force: true} to force node to step down."); } + return false; } - allFinishedGuard.Dismiss(); - // We send out a fresh round of heartbeats because stepping down successfully without - // {force: true} is dependent on timely heartbeat data. - // This callback is invoked every time a heartbeat response is processed so restart heartbeats - // only once. - if (!restartHeartbeats) { - return; - } - stdx::lock_guard<stdx::mutex> lk(_mutex); - _restartHeartbeats_inlock(); + const auto action = [&] { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _updateMemberStateFromTopologyCoordinator_inlock(); + }(); + _performPostMemberStateUpdateAction(action); + + // Schedule work to (potentially) step back up once the stepdown period has ended. + _scheduleWorkAt( + stepDownUntil, + stdx::bind(&ReplicationCoordinatorImpl::_handleTimePassing, this, stdx::placeholders::_1)); + return true; } void ReplicationCoordinatorImpl::_handleTimePassing( @@ -3637,12 +3520,7 @@ void ReplicationCoordinatorImpl::waitUntilSnapshotCommitted(OperationContext* tx stdx::unique_lock<stdx::mutex> lock(_mutex); while (!_currentCommittedSnapshot || _currentCommittedSnapshot->name < untilSnapshot) { - if (!txn->hasDeadline()) { - _currentCommittedSnapshotCond.wait(lock); - } else { - _currentCommittedSnapshotCond.wait_until(lock, txn->getDeadline().toSystemTimePoint()); - } - txn->checkForInterrupt(); + txn->waitForConditionOrInterrupt(_currentCommittedSnapshotCond, lock); } } diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index d66dd431279..eb22e212e30 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -46,7 +46,6 @@ #include "mongo/db/repl/sync_source_resolver.h" #include "mongo/db/repl/topology_coordinator.h" #include "mongo/db/repl/update_position_args.h" -#include "mongo/db/service_context.h" #include "mongo/db/storage/snapshot_name.h" #include "mongo/platform/atomic_word.h" #include "mongo/platform/unordered_map.h" @@ -85,14 +84,10 @@ class StorageInterface; class TopologyCoordinator; class VoteRequester; -class ReplicationCoordinatorImpl : public ReplicationCoordinator, public KillOpListenerInterface { +class ReplicationCoordinatorImpl : public ReplicationCoordinator { MONGO_DISALLOW_COPYING(ReplicationCoordinatorImpl); public: - // For testing only. - using StepDownNonBlockingResult = - std::pair<std::unique_ptr<mongo::Lock::GlobalLock>, ReplicationExecutor::EventHandle>; - // Takes ownership of the "externalState", "topCoord" and "network" objects. ReplicationCoordinatorImpl(const ReplSettings& settings, ReplicationCoordinatorExternalState* externalState, @@ -134,18 +129,6 @@ public: virtual void clearSyncSourceBlacklist() override; - /* - * Implementation of the KillOpListenerInterface interrupt method so that we can wake up - * threads blocked in awaitReplication() when a killOp command comes in. - */ - virtual void interrupt(unsigned opId); - - /* - * Implementation of the KillOpListenerInterface interruptAll method so that we can wake up - * threads blocked in awaitReplication() when we kill all operations. - */ - virtual void interruptAll(); - virtual ReplicationCoordinator::StatusAndDuration awaitReplication( OperationContext* txn, const OpTime& opTime, const WriteConcernOptions& writeConcern); @@ -373,22 +356,6 @@ public: Status setLastDurableOptime_forTest(long long cfgVer, long long memberId, const OpTime& opTime); /** - * Non-blocking version of stepDown. - * Returns a pair of global shared lock and event handle which are used to wait for the step - * down operation to complete. The global shared lock prevents writes until the step down has - * completed (or failed). - * When the operation is complete (wait() returns), 'result' will be set to the - * final status of the operation. - * If the handle is invalid, step down failed before we could schedule the rest of - * the step down processing and the error will be available immediately in 'result'. - */ - StepDownNonBlockingResult stepDown_nonBlocking(OperationContext* txn, - bool force, - const Milliseconds& waitTime, - const Milliseconds& stepdownTime, - Status* result); - - /** * Non-blocking version of setFollowerMode. * Returns event handle that we can use to wait for the operation to complete. * When the operation is complete (wait() returns), 'success' will be set to true @@ -644,17 +611,14 @@ private: void _handleTimePassing(const ReplicationExecutor::CallbackArgs& cbData); /** - * Helper method for _awaitReplication that takes an already locked unique_lock and a - * Timer for timing the operation which has been counting since before the lock was - * acquired. + * Helper method for _awaitReplication that takes an already locked unique_lock, but leaves + * operation timing to the caller. */ - ReplicationCoordinator::StatusAndDuration _awaitReplication_inlock( - const Timer* timer, - stdx::unique_lock<stdx::mutex>* lock, - OperationContext* txn, - const OpTime& opTime, - SnapshotName minSnapshot, - const WriteConcernOptions& writeConcern); + Status _awaitReplication_inlock(stdx::unique_lock<stdx::mutex>* lock, + OperationContext* txn, + const OpTime& opTime, + SnapshotName minSnapshot, + const WriteConcernOptions& writeConcern); /** * Returns true if the given writeConcern is satisfied up to "optime" or is unsatisfiable. @@ -690,17 +654,11 @@ private: void _signalStepDownWaiter_inlock(); /** - * Helper for stepDown run within a ReplicationExecutor callback. This method assumes - * it is running within a global shared lock, and thus that no writes are going on at the - * same time. + * Non-blocking helper method for the stepDown method, that represents executing + * one attempt to step down. See implementation of this method and stepDown for + * details. */ - void _stepDownContinue(const ReplicationExecutor::EventHandle finishedEvent, - OperationContext* txn, - Date_t waitUntil, - Date_t stepdownUntil, - bool force, - bool restartHeartbeats, - Status* result); + bool _tryToStepDown(Date_t waitUntil, Date_t stepdownUntil, bool force); OID _getMyRID_inlock() const; @@ -1280,8 +1238,8 @@ private: // This member's index position in the current config. int _selfIndex; // (MX) - // Event handle that should be signaled whenever new heartbeat data comes in. - ReplicationExecutor::EventHandle _stepDownWaiter; // (M) + // Condition to signal when new heartbeat data comes in. + stdx::condition_variable _stepDownWaiters; // (X) // State for conducting an election of this node. // the presence of a non-null _freshnessChecker pointer indicates that an election is diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index 8679967fd90..68584255cfb 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -210,11 +210,8 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse( } } - { - LockGuard lk(_mutex); - // Wake the stepdown waiter when our updated OpTime allows it to finish stepping down. - _signalStepDownWaiter_inlock(); - } + // Wake the stepdown waiter when our updated OpTime allows it to finish stepping down. + _signalStepDownWaiter_inlock(); _scheduleHeartbeatToTarget( target, targetIndex, std::max(now, action.getNextHeartbeatStartDate())); diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index 659096ed571..11966b5bf94 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -65,6 +65,7 @@ #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" #include "mongo/util/log.h" +#include "mongo/util/scopeguard.h" #include "mongo/util/time_support.h" #include "mongo/util/timer.h" @@ -115,6 +116,14 @@ void runSingleNodeElection(ServiceContext::UniqueOperationContext txn, replCoord->signalDrainComplete(txn.get()); } +/** + * Helper that kills an operation, taking the necessary locks. + */ +void killOperation(OperationContext* txn) { + stdx::lock_guard<Client> lkClient(*txn->getClient()); + txn->getServiceContext()->killOperation(txn); +} + TEST_F(ReplCoordTest, NodeEntersStartup2StateWhenStartingUpWithValidLocalConfig) { assertStartSuccess(BSON("_id" << "mySet" @@ -1213,7 +1222,7 @@ TEST_F(ReplCoordTest, NodeReturnsWriteConcernFailedWhenAWriteConcernTimesOutBefo OpTimeWithTermZero time2(100, 2); WriteConcernOptions writeConcern; - writeConcern.wTimeout = 50; + writeConcern.wDeadline = getNet()->now() + Milliseconds(50); writeConcern.wNumNodes = 2; // 2 nodes waiting for time2 @@ -1223,6 +1232,11 @@ TEST_F(ReplCoordTest, NodeReturnsWriteConcernFailedWhenAWriteConcernTimesOutBefo getReplCoord()->setMyLastAppliedOpTime(time2); getReplCoord()->setMyLastDurableOpTime(time2); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 1, time1)); + { + NetworkInterfaceMock::InNetworkGuard inNet(getNet()); + getNet()->runUntil(writeConcern.wDeadline); + ASSERT_EQUALS(writeConcern.wDeadline, getNet()->now()); + } ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult(); ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status); awaiter.reset(); @@ -1319,7 +1333,7 @@ TEST_F(ReplCoordTest, NodeReturnsNotMasterWhenSteppingDownBeforeSatisfyingAWrite awaiter.start(); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 1, time1)); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 2, time1)); - getReplCoord()->stepDown(txn.get(), true, Milliseconds(0), Milliseconds(1000)); + ASSERT_OK(getReplCoord()->stepDown(txn.get(), true, Milliseconds(0), Milliseconds(1000))); ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult(); ASSERT_EQUALS(ErrorCodes::PrimarySteppedDown, statusAndDur.status); awaiter.reset(); @@ -1362,13 +1376,7 @@ TEST_F(ReplCoordTest, ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 1, time1)); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 2, time1)); - OperationContext* txn = awaiter.getOperationContext(); - auto opID = txn->getOpID(); - { - stdx::lock_guard<Client> lk(*txn->getClient()); - txn->markKilled(ErrorCodes::Interrupted); - } - getReplCoord()->interrupt(opID); + killOperation(awaiter.getOperationContext()); ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult(); ASSERT_EQUALS(ErrorCodes::Interrupted, statusAndDur.status); awaiter.reset(); @@ -1376,6 +1384,33 @@ TEST_F(ReplCoordTest, class StepDownTest : public ReplCoordTest { protected: + struct SharedClientAndOperation { + static SharedClientAndOperation make(ServiceContext* serviceContext) { + SharedClientAndOperation result; + result.client = serviceContext->makeClient("StepDownThread"); + result.txn = result.client->makeOperationContext(); + return result; + } + std::shared_ptr<Client> client; + std::shared_ptr<OperationContext> txn; + }; + + std::pair<SharedClientAndOperation, stdx::future<boost::optional<Status>>> stepDown_nonBlocking( + bool force, Milliseconds waitTime, Milliseconds stepDownTime) { + using PromisedClientAndOperation = stdx::promise<SharedClientAndOperation>; + auto task = stdx::packaged_task<boost::optional<Status>(PromisedClientAndOperation)>( + [=](PromisedClientAndOperation operationPromise) -> boost::optional<Status> { + auto result = SharedClientAndOperation::make(getServiceContext()); + operationPromise.set_value(result); + return getReplCoord()->stepDown(result.txn.get(), force, waitTime, stepDownTime); + }); + auto result = task.get_future(); + PromisedClientAndOperation operationPromise; + auto operationFuture = operationPromise.get_future(); + stdx::thread(std::move(task), std::move(operationPromise)).detach(); + return std::make_pair(operationFuture.get(), std::move(result)); + } + OID myRid; OID rid2; OID rid3; @@ -1700,25 +1735,16 @@ TEST_F(StepDownTest, simulateSuccessfulV1Election(); - const auto txn = makeOperationContext(); - - // Step down where the secondary actually has to catch up before the stepDown can succeed. // On entering the network, _stepDownContinue should cancel the heartbeats scheduled for // T + 2 seconds and send out a new round of heartbeats immediately. // This makes it unnecessary to advance the clock after entering the network to process // the heartbeat requests. - Status result(ErrorCodes::InternalError, "not mutated"); - auto globalReadLockAndEventHandle = repl->stepDown_nonBlocking( - txn.get(), false, Milliseconds(10000), Milliseconds(60000), &result); - const auto& eventHandle = globalReadLockAndEventHandle.second; - ASSERT_TRUE(eventHandle); - ASSERT_TRUE(txn->lockState()->isReadLocked()); + auto result = stepDown_nonBlocking(false, Seconds(10), Seconds(60)); // Make a secondary actually catch up enterNetwork(); getNet()->runUntil(getNet()->now() + Milliseconds(1000)); - ASSERT(getNet()->hasReadyRequests()); NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest(); RemoteCommandRequest request = noi->getRequest(); log() << request.target.toString() << " processing " << request.cmdObj; @@ -1743,8 +1769,7 @@ TEST_F(StepDownTest, getNet()->runReadyNetworkOperations(); exitNetwork(); - getReplExec()->waitForEvent(eventHandle); - ASSERT_OK(result); + ASSERT_OK(*result.second.get()); ASSERT_TRUE(repl->getMemberState().secondary()); } @@ -1761,25 +1786,16 @@ TEST_F(StepDownTest, simulateSuccessfulV1Election(); - const auto txn = makeOperationContext(); - - // Step down where the secondary actually has to catch up before the stepDown can succeed. // On entering the network, _stepDownContinue should cancel the heartbeats scheduled for // T + 2 seconds and send out a new round of heartbeats immediately. // This makes it unnecessary to advance the clock after entering the network to process // the heartbeat requests. - Status result(ErrorCodes::InternalError, "not mutated"); - auto globalReadLockAndEventHandle = repl->stepDown_nonBlocking( - txn.get(), false, Milliseconds(10000), Milliseconds(60000), &result); - const auto& eventHandle = globalReadLockAndEventHandle.second; - ASSERT_TRUE(eventHandle); - ASSERT_TRUE(txn->lockState()->isReadLocked()); + auto result = stepDown_nonBlocking(false, Seconds(10), Seconds(60)); // Secondary has not caught up on first round of heartbeats. enterNetwork(); getNet()->runUntil(getNet()->now() + Milliseconds(1000)); - ASSERT(getNet()->hasReadyRequests()); NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest(); RemoteCommandRequest request = noi->getRequest(); log() << "HB1: " << request.target.toString() << " processing " << request.cmdObj; @@ -1808,7 +1824,6 @@ TEST_F(StepDownTest, auto until = getNet()->now() + heartbeatInterval; getNet()->runUntil(until); ASSERT_EQUALS(until, getNet()->now()); - ASSERT(getNet()->hasReadyRequests()); noi = getNet()->getNextReadyRequest(); request = noi->getRequest(); log() << "HB2: " << request.target.toString() << " processing " << request.cmdObj; @@ -1830,8 +1845,7 @@ TEST_F(StepDownTest, getNet()->runReadyNetworkOperations(); exitNetwork(); - getReplExec()->waitForEvent(eventHandle); - ASSERT_OK(result); + ASSERT_OK(*result.second.get()); ASSERT_TRUE(repl->getMemberState().secondary()); } @@ -1847,28 +1861,12 @@ TEST_F(StepDownTest, NodeReturnsInterruptedWhenInterruptedDuringStepDown) { simulateSuccessfulV1Election(); - const auto txn = makeOperationContext(); - - const unsigned int opID = txn->getOpID(); - ASSERT_TRUE(repl->getMemberState().primary()); // stepDown where the secondary actually has to catch up before the stepDown can succeed. - Status result(ErrorCodes::InternalError, "not mutated"); - auto globalReadLockAndEventHandle = repl->stepDown_nonBlocking( - txn.get(), false, Milliseconds(10000), Milliseconds(60000), &result); - const auto& eventHandle = globalReadLockAndEventHandle.second; - ASSERT_TRUE(eventHandle); - ASSERT_TRUE(txn->lockState()->isReadLocked()); - - { - stdx::lock_guard<Client> lk(*(txn->getClient())); - txn->markKilled(ErrorCodes::Interrupted); - } - getReplCoord()->interrupt(opID); - - getReplExec()->waitForEvent(eventHandle); - ASSERT_EQUALS(ErrorCodes::Interrupted, result); + auto result = stepDown_nonBlocking(false, Seconds(10), Seconds(60)); + killOperation(result.first.txn.get()); + ASSERT_EQUALS(ErrorCodes::Interrupted, *result.second.get()); ASSERT_TRUE(repl->getMemberState().primary()); } @@ -3416,12 +3414,8 @@ TEST_F(ReplCoordTest, NodeReturnsInterruptedWhenWaitingUntilAnOpTimeIsInterrupte getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(10, 0)); getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(10, 0)); - auto txn = makeOperationContext(); - - { - stdx::lock_guard<Client> lk(*(txn->getClient())); - txn->markKilled(ErrorCodes::Interrupted); - } + const auto txn = makeOperationContext(); + killOperation(txn.get()); auto status = getReplCoord()->waitUntilOpTimeForRead( txn.get(), ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern)); @@ -3557,12 +3551,7 @@ TEST_F(ReplCoordTest, ReadAfterCommittedInterrupted) { getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(10, 0), 0)); getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(10, 0), 0)); - - { - stdx::lock_guard<Client> lk(*(txn->getClient())); - txn->markKilled(ErrorCodes::Interrupted); - } - + killOperation(txn.get()); auto status = getReplCoord()->waitUntilOpTimeForRead( txn.get(), ReadConcernArgs(OpTime(Timestamp(50, 0), 0), ReadConcernLevel::kMajorityReadConcern)); diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp index c630ddca26d..d3425170736 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp @@ -129,6 +129,10 @@ void ReplCoordTest::init() { _replExec.get(), seed, &_durablityLambda)); + auto service = getGlobalServiceContext(); + service->setFastClockSource(stdx::make_unique<executor::NetworkInterfaceMockClockSource>(_net)); + service->setPreciseClockSource( + stdx::make_unique<executor::NetworkInterfaceMockClockSource>(_net)); } void ReplCoordTest::init(const ReplSettings& settings) { diff --git a/src/mongo/db/write_concern_options.h b/src/mongo/db/write_concern_options.h index 85f9e0e0390..d66be64fdfb 100644 --- a/src/mongo/db/write_concern_options.h +++ b/src/mongo/db/write_concern_options.h @@ -105,6 +105,9 @@ public: // Timeout in milliseconds. int wTimeout; + // Deadline. If this is set to something other than Date_t::max(), this takes precedence over + // wTimeout. + Date_t wDeadline = Date_t::max(); // True if the default write concern was used. bool usedDefault = false; diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp index 8cf36afc153..1eed68daea5 100644 --- a/src/mongo/executor/network_interface_mock.cpp +++ b/src/mongo/executor/network_interface_mock.cpp @@ -664,5 +664,10 @@ NetworkInterfaceMock::InNetworkGuard::~InNetworkGuard() { _net->exitNetwork(); } +NetworkInterfaceMockClockSource::NetworkInterfaceMockClockSource(NetworkInterfaceMock* net) + : _net(net) { + _tracksSystemClock = false; +} + } // namespace executor } // namespace mongo diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h index f75330e7acf..0ba054f11fe 100644 --- a/src/mongo/executor/network_interface_mock.h +++ b/src/mongo/executor/network_interface_mock.h @@ -41,6 +41,7 @@ #include "mongo/stdx/mutex.h" #include "mongo/stdx/unordered_map.h" #include "mongo/stdx/unordered_set.h" +#include "mongo/util/clock_source.h" #include "mongo/util/time_support.h" namespace mongo { @@ -519,5 +520,23 @@ private: bool _callExitNetwork = true; }; +class NetworkInterfaceMockClockSource : public ClockSource { +public: + explicit NetworkInterfaceMockClockSource(NetworkInterfaceMock* net); + + Milliseconds getPrecision() override { + return Milliseconds{1}; + } + Date_t now() override { + return _net->now(); + } + Status setAlarm(Date_t when, stdx::function<void()> action) override { + return _net->setAlarm(when, action); + } + +private: + NetworkInterfaceMock* _net; +}; + } // namespace executor } // namespace mongo |