summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/replication_coordinator_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl.cpp')
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp283
1 files changed, 88 insertions, 195 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 301f2c92c7c..f42e712ffd3 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -155,45 +155,54 @@ std::string ReplicationCoordinatorImpl::SlaveInfo::toString() const {
return toBSON().toString();
}
-ReplicationCoordinatorImpl::Waiter::Waiter(OpTime _opTime, const WriteConcernOptions* _writeConcern)
- : opTime(std::move(_opTime)), writeConcern(_writeConcern) {}
+struct ReplicationCoordinatorImpl::WaiterInfo {
-BSONObj ReplicationCoordinatorImpl::Waiter::toBSON() const {
- BSONObjBuilder bob;
- bob.append("opTime", opTime.toBSON());
- if (writeConcern) {
- bob.append("writeConcern", writeConcern->toBSON());
- }
- return bob.obj();
-};
-
-std::string ReplicationCoordinatorImpl::Waiter::toString() const {
- return toBSON().toString();
-};
+ using FinishFunc = stdx::function<void()>;
+ WaiterInfo(unsigned int _opID,
+ const OpTime _opTime,
+ const WriteConcernOptions* _writeConcern,
+ stdx::condition_variable* _condVar)
+ : opID(_opID), opTime(_opTime), writeConcern(_writeConcern), condVar(_condVar) {}
-ReplicationCoordinatorImpl::ThreadWaiter::ThreadWaiter(OpTime _opTime,
- const WriteConcernOptions* _writeConcern,
- stdx::condition_variable* _condVar)
- : Waiter(_opTime, _writeConcern), condVar(_condVar) {}
+ // When waiter is signaled, finishCallback will be called while holding replCoord _mutex
+ // since WaiterLists are protected by _mutex.
+ WaiterInfo(const OpTime _opTime, FinishFunc _finishCallback)
+ : opTime(_opTime), finishCallback(_finishCallback) {}
-void ReplicationCoordinatorImpl::ThreadWaiter::notify_inlock() {
- invariant(condVar);
- condVar->notify_all();
-}
+ BSONObj toBSON() const {
+ BSONObjBuilder bob;
+ bob.append("opId", opID);
+ bob.append("opTime", opTime.toBSON());
+ if (writeConcern) {
+ bob.append("writeConcern", writeConcern->toBSON());
+ }
+ return bob.obj();
+ };
-ReplicationCoordinatorImpl::CallbackWaiter::CallbackWaiter(OpTime _opTime,
- FinishFunc _finishCallback)
- : Waiter(_opTime, nullptr), finishCallback(std::move(_finishCallback)) {}
+ std::string toString() const {
+ return toBSON().toString();
+ };
-void ReplicationCoordinatorImpl::CallbackWaiter::notify_inlock() {
- invariant(finishCallback);
- finishCallback();
-}
+ // It is invalid to call notify() unless holding ReplicationCoordinatorImpl::_mutex.
+ void notify() {
+ if (condVar) {
+ condVar->notify_all();
+ }
+ if (finishCallback) {
+ finishCallback();
+ }
+ }
+ const unsigned int opID = 0;
+ const OpTime opTime;
+ const WriteConcernOptions* writeConcern = nullptr;
+ stdx::condition_variable* condVar = nullptr;
+ // The callback that will be called when this waiter is notified.
+ FinishFunc finishCallback = nullptr;
+};
-class ReplicationCoordinatorImpl::WaiterGuard {
-public:
+struct ReplicationCoordinatorImpl::WaiterInfoGuard {
/**
* Constructor takes the list of waiters and enqueues itself on the list, removing itself
* in the destructor.
@@ -205,17 +214,23 @@ public:
* _list is guarded by ReplicationCoordinatorImpl::_mutex, thus it is illegal to construct one
* of these without holding _mutex
*/
- WaiterGuard(WaiterList* list, Waiter* waiter) : _list(list), _waiter(waiter) {
- list->add_inlock(_waiter);
+ WaiterInfoGuard(WaiterList* list,
+ unsigned int opID,
+ const OpTime opTime,
+ const WriteConcernOptions* writeConcern,
+ stdx::condition_variable* condVar)
+ : waiter(opID, opTime, writeConcern, condVar), _list(list) {
+ list->add_inlock(&waiter);
}
- ~WaiterGuard() {
- _list->remove_inlock(_waiter);
+ ~WaiterInfoGuard() {
+ _list->remove_inlock(&waiter);
}
+ WaiterInfo waiter;
+
private:
WaiterList* _list;
- Waiter* _waiter;
};
void ReplicationCoordinatorImpl::WaiterList::add_inlock(WaiterType waiter) {
@@ -224,46 +239,33 @@ void ReplicationCoordinatorImpl::WaiterList::add_inlock(WaiterType waiter) {
void ReplicationCoordinatorImpl::WaiterList::signalAndRemoveIf_inlock(
stdx::function<bool(WaiterType)> func) {
- // Only advance iterator when the element doesn't match.
- for (auto it = _list.begin(); it != _list.end();) {
- if (!func(*it)) {
- ++it;
- continue;
- }
-
- WaiterType waiter = std::move(*it);
- if (it == std::prev(_list.end())) {
- // Iterator will be invalid after erasing the last element, so set it to the
- // next one (i.e. end()).
- it = _list.erase(it);
- } else {
- // Iterator is still valid after pop_back().
- std::swap(*it, _list.back());
- _list.pop_back();
+ std::vector<WaiterType>::iterator it = _list.end();
+ while (true) {
+ it = std::find_if(_list.begin(), _list.end(), func);
+ if (it == _list.end()) {
+ break;
}
-
- // It's important to call notify() after the waiter has been removed from the list
- // since notify() might remove the waiter itself.
- waiter->notify_inlock();
+ (*it)->notify();
+ std::swap(*it, _list.back());
+ _list.pop_back();
}
}
void ReplicationCoordinatorImpl::WaiterList::signalAndRemoveAll_inlock() {
- std::vector<WaiterType> list = std::move(_list);
- // Call notify() after removing the waiters from the list.
- for (auto& waiter : list) {
- waiter->notify_inlock();
+ for (auto& waiter : _list) {
+ waiter->notify();
}
+ _list.clear();
}
bool ReplicationCoordinatorImpl::WaiterList::remove_inlock(WaiterType waiter) {
auto it = std::find(_list.begin(), _list.end(), waiter);
- if (it == _list.end()) {
- return false;
+ if (it != _list.end()) {
+ std::swap(*it, _list.back());
+ _list.pop_back();
+ return true;
}
- std::swap(*it, _list.back());
- _list.pop_back();
- return true;
+ return false;
}
namespace {
@@ -1184,7 +1186,7 @@ void ReplicationCoordinatorImpl::_setMyLastAppliedOpTime_inlock(const OpTime& op
_updateSlaveInfoAppliedOpTime_inlock(mySlaveInfo, opTime);
_opTimeWaiterList.signalAndRemoveIf_inlock(
- [opTime](Waiter* waiter) { return waiter->opTime <= opTime; });
+ [opTime](WaiterInfo* waiter) { return waiter->opTime <= opTime; });
}
void ReplicationCoordinatorImpl::_setMyLastDurableOpTime_inlock(const OpTime& opTime,
@@ -1350,11 +1352,11 @@ Status ReplicationCoordinatorImpl::_waitUntilOpTimeForReadDeprecated(
// We just need to wait for the opTime to catch up to what we need (not majority RC).
stdx::condition_variable condVar;
- ThreadWaiter waiter(targetOpTime, nullptr, &condVar);
- WaiterGuard guard(&_opTimeWaiterList, &waiter);
+ WaiterInfoGuard waitInfo(
+ &_opTimeWaiterList, opCtx->getOpID(), targetOpTime, nullptr, &condVar);
- LOG(3) << "waituntilOpTime: OpID " << opCtx->getOpID() << " is waiting for OpTime "
- << waiter << " until " << opCtx->getDeadline();
+ LOG(3) << "waituntilOpTime: waiting for OpTime " << waitInfo.waiter << " until "
+ << opCtx->getDeadline();
auto waitStatus = opCtx->waitForConditionOrInterruptNoAssert(condVar, lock);
if (!waitStatus.isOK()) {
@@ -1742,8 +1744,8 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock(
// Must hold _mutex before constructing waitInfo as it will modify _replicationWaiterList
stdx::condition_variable condVar;
- ThreadWaiter waiter(opTime, &writeConcern, &condVar);
- WaiterGuard guard(&_replicationWaiterList, &waiter);
+ WaiterInfoGuard waitInfo(
+ &_replicationWaiterList, opCtx->getOpID(), opTime, &writeConcern, &condVar);
while (!_doneWaitingForReplication_inlock(opTime, minSnapshot, writeConcern)) {
if (_inShutdown) {
@@ -1761,7 +1763,7 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock(
BSONObjBuilder progress;
_appendSlaveInfoData_inlock(&progress);
log() << "Replication for failed WC: " << writeConcern.toBSON()
- << ", waitInfo: " << waiter << ", opID: " << opCtx->getOpID()
+ << ", waitInfo:" << waitInfo.waiter.toBSON()
<< ", progress: " << progress.done();
}
return {ErrorCodes::WriteConcernFailed, "waiting for replication timed out"};
@@ -2658,11 +2660,8 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock() {
result = kActionFollowerModeStateChange;
}
- // Exit catchup mode if we're in it and enable replication producer and applier on stepdown.
+ // Enable replication producer and applier on stepdown.
if (_memberState.primary()) {
- if (_catchupState) {
- _catchupState->abort_inlock();
- }
_applierState = ApplierState::Running;
_externalState->startProducerIfStopped();
}
@@ -2766,18 +2765,13 @@ void ReplicationCoordinatorImpl::_performPostMemberStateUpdateAction(
invariant(nextAction != kActionWinElection);
lk.unlock();
_performPostMemberStateUpdateAction(nextAction);
- lk.lock();
- if (!_getMemberState_inlock().primary()) {
- break;
- }
// Notify all secondaries of the election win.
- _restartHeartbeats_inlock();
+ lk.lock();
+ _scheduleElectionWinNotification_inlock();
if (isV1ElectionProtocol()) {
- invariant(!_catchupState);
- _catchupState = stdx::make_unique<CatchupState>(this);
- _catchupState->start_inlock();
+ _scanOpTimeForCatchUp_inlock();
} else {
- _enterDrainMode_inlock();
+ _finishCatchingUpOplog_inlock();
}
break;
}
@@ -2792,114 +2786,13 @@ void ReplicationCoordinatorImpl::_performPostMemberStateUpdateAction(
}
}
-void ReplicationCoordinatorImpl::CatchupState::start_inlock() {
- log() << "Entering primary catch-up mode.";
-
- // No catchup in single node replica set.
- if (_repl->_rsConfig.getNumMembers() == 1) {
- abort_inlock();
- return;
- }
-
- auto timeoutCB = [this](const CallbackArgs& cbData) {
- if (!cbData.status.isOK()) {
- return;
- }
- log() << "Catchup timed out after becoming primary.";
- stdx::lock_guard<stdx::mutex> lk(_repl->_mutex);
- abort_inlock();
- };
-
- // Schedule timeout callback.
- auto catchupTimeout = _repl->_rsConfig.getCatchUpTimeoutPeriod();
- // Deal with infinity and overflow - no timeout.
- if (catchupTimeout == ReplSetConfig::kInfiniteCatchUpTimeout ||
- Date_t::max() - _repl->_replExecutor->now() <= catchupTimeout) {
- return;
- }
- auto timeoutDate = _repl->_replExecutor->now() + catchupTimeout;
- auto status = _repl->_replExecutor->scheduleWorkAt(timeoutDate, timeoutCB);
- if (!status.isOK()) {
- log() << "Failed to schedule catchup timeout work.";
- abort_inlock();
- return;
- }
- _timeoutCbh = status.getValue();
-}
-
-void ReplicationCoordinatorImpl::CatchupState::abort_inlock() {
- invariant(_repl->_getMemberState_inlock().primary());
-
- log() << "Exited primary catch-up mode.";
- // Clean up its own members.
- if (_timeoutCbh) {
- _repl->_replExecutor->cancel(_timeoutCbh);
- }
- if (_waiter) {
- _repl->_opTimeWaiterList.remove_inlock(_waiter.get());
- }
-
- // Enter primary drain mode.
- _repl->_enterDrainMode_inlock();
- // Destruct the state itself.
- _repl->_catchupState.reset(nullptr);
-}
-
-void ReplicationCoordinatorImpl::CatchupState::signalHeartbeatUpdate_inlock() {
- auto targetOpTime = _repl->_topCoord->latestKnownOpTimeSinceHeartbeatRestart();
- // Haven't collected all heartbeat responses.
- if (!targetOpTime) {
- return;
- }
-
- // We've caught up.
- if (*targetOpTime <= _repl->_getMyLastAppliedOpTime_inlock()) {
- log() << "Caught up to the latest known optime via heartbeats after becoming primary.";
- abort_inlock();
- return;
- }
-
- // Reset the target optime if it has changed.
- if (_waiter && _waiter->opTime == *targetOpTime) {
- return;
- }
-
- log() << "Heartbeats updated catchup target optime to " << *targetOpTime;
- if (_waiter) {
- _repl->_opTimeWaiterList.remove_inlock(_waiter.get());
- }
- auto targetOpTimeCB = [this, targetOpTime]() {
- // Double check the target time since stepdown may signal us too.
- if (*targetOpTime <= _repl->_getMyLastAppliedOpTime_inlock()) {
- log() << "Caught up to the latest known optime successfully after becoming primary.";
- abort_inlock();
- }
- };
- _waiter = stdx::make_unique<CallbackWaiter>(*targetOpTime, targetOpTimeCB);
- _repl->_opTimeWaiterList.add_inlock(_waiter.get());
-}
-
-Status ReplicationCoordinatorImpl::abortCatchupIfNeeded() {
- if (!isV1ElectionProtocol()) {
- return Status(ErrorCodes::CommandNotSupported,
- "Primary catch-up is only supported by Protocol Version 1");
- }
-
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_catchupState) {
- _catchupState->abort_inlock();
- return Status::OK();
- }
- return Status(ErrorCodes::IllegalOperation, "The node is not in catch-up mode.");
-}
-
void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() {
auto scanner = std::make_shared<FreshnessScanner>();
auto scanStartTime = _replExecutor->now();
auto evhStatus = scanner->start(
_replExecutor.get(), _rsConfig, _selfIndex, _rsConfig.getCatchUpTimeoutPeriod());
if (evhStatus == ErrorCodes::ShutdownInProgress) {
- _enterDrainMode_inlock();
+ _finishCatchingUpOplog_inlock();
return;
}
fassertStatusOK(40254, evhStatus.getStatus());
@@ -2908,7 +2801,7 @@ void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() {
evhStatus.getValue(), [this, scanner, scanStartTime, term](const CallbackArgs& cbData) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
if (cbData.status == ErrorCodes::CallbackCanceled) {
- _enterDrainMode_inlock();
+ _finishCatchingUpOplog_inlock();
return;
}
auto totalTimeout = _rsConfig.getCatchUpTimeoutPeriod();
@@ -2937,7 +2830,7 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca
log() << "Could not access any nodes within timeout when checking for "
<< "additional ops to apply before finishing transition to primary. "
<< "Will move forward with becoming primary anyway.";
- _enterDrainMode_inlock();
+ _finishCatchingUpOplog_inlock();
return;
}
@@ -2946,7 +2839,7 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca
if (freshnessInfo.opTime <= _getMyLastAppliedOpTime_inlock()) {
log() << "My optime is most up-to-date, skipping catch-up "
<< "and completing transition to primary.";
- _enterDrainMode_inlock();
+ _finishCatchingUpOplog_inlock();
return;
}
@@ -2959,9 +2852,9 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca
log() << "Finished catch-up oplog after becoming primary.";
}
- _enterDrainMode_inlock();
+ _finishCatchingUpOplog_inlock();
};
- auto waiterInfo = std::make_shared<CallbackWaiter>(freshnessInfo.opTime, finishCB);
+ auto waiterInfo = std::make_shared<WaiterInfo>(freshnessInfo.opTime, finishCB);
_opTimeWaiterList.add_inlock(waiterInfo.get());
auto timeoutCB = [this, waiterInfo, finishCB](const CallbackArgs& cbData) {
@@ -2974,7 +2867,7 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca
_replExecutor->scheduleWorkAt(_replExecutor->now() + timeout, timeoutCB);
}
-void ReplicationCoordinatorImpl::_enterDrainMode_inlock() {
+void ReplicationCoordinatorImpl::_finishCatchingUpOplog_inlock() {
_applierState = ApplierState::Draining;
_externalState->stopProducer();
}
@@ -3065,7 +2958,7 @@ ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock(const ReplSetConfig& newC
}
void ReplicationCoordinatorImpl::_wakeReadyWaiters_inlock() {
- _replicationWaiterList.signalAndRemoveIf_inlock([this](Waiter* waiter) {
+ _replicationWaiterList.signalAndRemoveIf_inlock([this](WaiterInfo* waiter) {
return _doneWaitingForReplication_inlock(
waiter->opTime, SnapshotName::min(), *waiter->writeConcern);
});