summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/replication_coordinator_impl.cpp
diff options
context:
space:
mode:
authorSiyuan Zhou <siyuan.zhou@mongodb.com>2017-04-20 15:28:38 -0400
committerSiyuan Zhou <siyuan.zhou@mongodb.com>2017-04-20 22:36:43 -0400
commit4680351e3fe6f8f47c04440f1c5d1232a4ab7b2d (patch)
treedae7b10842b2e1899a683adf4a545759182ce2ab /src/mongo/db/repl/replication_coordinator_impl.cpp
parent8b437e7a762e3ef99848659dc0d68df1e2add0a4 (diff)
downloadmongo-4680351e3fe6f8f47c04440f1c5d1232a4ab7b2d.tar.gz
SERVER-26848 Exit catchup mode when not syncing more data.
This reverts commit c08590a6ac9dc54c9d910822d47ea17140b56f89.
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl.cpp')
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp283
1 files changed, 195 insertions, 88 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index f42e712ffd3..301f2c92c7c 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -155,54 +155,45 @@ std::string ReplicationCoordinatorImpl::SlaveInfo::toString() const {
return toBSON().toString();
}
-struct ReplicationCoordinatorImpl::WaiterInfo {
+ReplicationCoordinatorImpl::Waiter::Waiter(OpTime _opTime, const WriteConcernOptions* _writeConcern)
+ : opTime(std::move(_opTime)), writeConcern(_writeConcern) {}
- using FinishFunc = stdx::function<void()>;
+BSONObj ReplicationCoordinatorImpl::Waiter::toBSON() const {
+ BSONObjBuilder bob;
+ bob.append("opTime", opTime.toBSON());
+ if (writeConcern) {
+ bob.append("writeConcern", writeConcern->toBSON());
+ }
+ return bob.obj();
+};
- WaiterInfo(unsigned int _opID,
- const OpTime _opTime,
- const WriteConcernOptions* _writeConcern,
- stdx::condition_variable* _condVar)
- : opID(_opID), opTime(_opTime), writeConcern(_writeConcern), condVar(_condVar) {}
+std::string ReplicationCoordinatorImpl::Waiter::toString() const {
+ return toBSON().toString();
+};
- // When waiter is signaled, finishCallback will be called while holding replCoord _mutex
- // since WaiterLists are protected by _mutex.
- WaiterInfo(const OpTime _opTime, FinishFunc _finishCallback)
- : opTime(_opTime), finishCallback(_finishCallback) {}
- BSONObj toBSON() const {
- BSONObjBuilder bob;
- bob.append("opId", opID);
- bob.append("opTime", opTime.toBSON());
- if (writeConcern) {
- bob.append("writeConcern", writeConcern->toBSON());
- }
- return bob.obj();
- };
+ReplicationCoordinatorImpl::ThreadWaiter::ThreadWaiter(OpTime _opTime,
+ const WriteConcernOptions* _writeConcern,
+ stdx::condition_variable* _condVar)
+ : Waiter(_opTime, _writeConcern), condVar(_condVar) {}
- std::string toString() const {
- return toBSON().toString();
- };
+void ReplicationCoordinatorImpl::ThreadWaiter::notify_inlock() {
+ invariant(condVar);
+ condVar->notify_all();
+}
- // It is invalid to call notify() unless holding ReplicationCoordinatorImpl::_mutex.
- void notify() {
- if (condVar) {
- condVar->notify_all();
- }
- if (finishCallback) {
- finishCallback();
- }
- }
+ReplicationCoordinatorImpl::CallbackWaiter::CallbackWaiter(OpTime _opTime,
+ FinishFunc _finishCallback)
+ : Waiter(_opTime, nullptr), finishCallback(std::move(_finishCallback)) {}
+
+void ReplicationCoordinatorImpl::CallbackWaiter::notify_inlock() {
+ invariant(finishCallback);
+ finishCallback();
+}
- const unsigned int opID = 0;
- const OpTime opTime;
- const WriteConcernOptions* writeConcern = nullptr;
- stdx::condition_variable* condVar = nullptr;
- // The callback that will be called when this waiter is notified.
- FinishFunc finishCallback = nullptr;
-};
-struct ReplicationCoordinatorImpl::WaiterInfoGuard {
+class ReplicationCoordinatorImpl::WaiterGuard {
+public:
/**
* Constructor takes the list of waiters and enqueues itself on the list, removing itself
* in the destructor.
@@ -214,23 +205,17 @@ struct ReplicationCoordinatorImpl::WaiterInfoGuard {
* _list is guarded by ReplicationCoordinatorImpl::_mutex, thus it is illegal to construct one
* of these without holding _mutex
*/
- WaiterInfoGuard(WaiterList* list,
- unsigned int opID,
- const OpTime opTime,
- const WriteConcernOptions* writeConcern,
- stdx::condition_variable* condVar)
- : waiter(opID, opTime, writeConcern, condVar), _list(list) {
- list->add_inlock(&waiter);
+ WaiterGuard(WaiterList* list, Waiter* waiter) : _list(list), _waiter(waiter) {
+ list->add_inlock(_waiter);
}
- ~WaiterInfoGuard() {
- _list->remove_inlock(&waiter);
+ ~WaiterGuard() {
+ _list->remove_inlock(_waiter);
}
- WaiterInfo waiter;
-
private:
WaiterList* _list;
+ Waiter* _waiter;
};
void ReplicationCoordinatorImpl::WaiterList::add_inlock(WaiterType waiter) {
@@ -239,33 +224,46 @@ void ReplicationCoordinatorImpl::WaiterList::add_inlock(WaiterType waiter) {
void ReplicationCoordinatorImpl::WaiterList::signalAndRemoveIf_inlock(
stdx::function<bool(WaiterType)> func) {
- std::vector<WaiterType>::iterator it = _list.end();
- while (true) {
- it = std::find_if(_list.begin(), _list.end(), func);
- if (it == _list.end()) {
- break;
+ // Only advance iterator when the element doesn't match.
+ for (auto it = _list.begin(); it != _list.end();) {
+ if (!func(*it)) {
+ ++it;
+ continue;
+ }
+
+ WaiterType waiter = std::move(*it);
+ if (it == std::prev(_list.end())) {
+ // Iterator will be invalid after erasing the last element, so set it to the
+ // next one (i.e. end()).
+ it = _list.erase(it);
+ } else {
+ // Iterator is still valid after pop_back().
+ std::swap(*it, _list.back());
+ _list.pop_back();
}
- (*it)->notify();
- std::swap(*it, _list.back());
- _list.pop_back();
+
+ // It's important to call notify() after the waiter has been removed from the list
+ // since notify() might remove the waiter itself.
+ waiter->notify_inlock();
}
}
void ReplicationCoordinatorImpl::WaiterList::signalAndRemoveAll_inlock() {
- for (auto& waiter : _list) {
- waiter->notify();
+ std::vector<WaiterType> list = std::move(_list);
+ // Call notify() after removing the waiters from the list.
+ for (auto& waiter : list) {
+ waiter->notify_inlock();
}
- _list.clear();
}
bool ReplicationCoordinatorImpl::WaiterList::remove_inlock(WaiterType waiter) {
auto it = std::find(_list.begin(), _list.end(), waiter);
- if (it != _list.end()) {
- std::swap(*it, _list.back());
- _list.pop_back();
- return true;
+ if (it == _list.end()) {
+ return false;
}
- return false;
+ std::swap(*it, _list.back());
+ _list.pop_back();
+ return true;
}
namespace {
@@ -1186,7 +1184,7 @@ void ReplicationCoordinatorImpl::_setMyLastAppliedOpTime_inlock(const OpTime& op
_updateSlaveInfoAppliedOpTime_inlock(mySlaveInfo, opTime);
_opTimeWaiterList.signalAndRemoveIf_inlock(
- [opTime](WaiterInfo* waiter) { return waiter->opTime <= opTime; });
+ [opTime](Waiter* waiter) { return waiter->opTime <= opTime; });
}
void ReplicationCoordinatorImpl::_setMyLastDurableOpTime_inlock(const OpTime& opTime,
@@ -1352,11 +1350,11 @@ Status ReplicationCoordinatorImpl::_waitUntilOpTimeForReadDeprecated(
// We just need to wait for the opTime to catch up to what we need (not majority RC).
stdx::condition_variable condVar;
- WaiterInfoGuard waitInfo(
- &_opTimeWaiterList, opCtx->getOpID(), targetOpTime, nullptr, &condVar);
+ ThreadWaiter waiter(targetOpTime, nullptr, &condVar);
+ WaiterGuard guard(&_opTimeWaiterList, &waiter);
- LOG(3) << "waituntilOpTime: waiting for OpTime " << waitInfo.waiter << " until "
- << opCtx->getDeadline();
+ LOG(3) << "waituntilOpTime: OpID " << opCtx->getOpID() << " is waiting for OpTime "
+ << waiter << " until " << opCtx->getDeadline();
auto waitStatus = opCtx->waitForConditionOrInterruptNoAssert(condVar, lock);
if (!waitStatus.isOK()) {
@@ -1744,8 +1742,8 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock(
// Must hold _mutex before constructing waitInfo as it will modify _replicationWaiterList
stdx::condition_variable condVar;
- WaiterInfoGuard waitInfo(
- &_replicationWaiterList, opCtx->getOpID(), opTime, &writeConcern, &condVar);
+ ThreadWaiter waiter(opTime, &writeConcern, &condVar);
+ WaiterGuard guard(&_replicationWaiterList, &waiter);
while (!_doneWaitingForReplication_inlock(opTime, minSnapshot, writeConcern)) {
if (_inShutdown) {
@@ -1763,7 +1761,7 @@ Status ReplicationCoordinatorImpl::_awaitReplication_inlock(
BSONObjBuilder progress;
_appendSlaveInfoData_inlock(&progress);
log() << "Replication for failed WC: " << writeConcern.toBSON()
- << ", waitInfo:" << waitInfo.waiter.toBSON()
+ << ", waitInfo: " << waiter << ", opID: " << opCtx->getOpID()
<< ", progress: " << progress.done();
}
return {ErrorCodes::WriteConcernFailed, "waiting for replication timed out"};
@@ -2660,8 +2658,11 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock() {
result = kActionFollowerModeStateChange;
}
- // Enable replication producer and applier on stepdown.
+ // Exit catchup mode if we're in it and enable replication producer and applier on stepdown.
if (_memberState.primary()) {
+ if (_catchupState) {
+ _catchupState->abort_inlock();
+ }
_applierState = ApplierState::Running;
_externalState->startProducerIfStopped();
}
@@ -2765,13 +2766,18 @@ void ReplicationCoordinatorImpl::_performPostMemberStateUpdateAction(
invariant(nextAction != kActionWinElection);
lk.unlock();
_performPostMemberStateUpdateAction(nextAction);
- // Notify all secondaries of the election win.
lk.lock();
- _scheduleElectionWinNotification_inlock();
+ if (!_getMemberState_inlock().primary()) {
+ break;
+ }
+ // Notify all secondaries of the election win.
+ _restartHeartbeats_inlock();
if (isV1ElectionProtocol()) {
- _scanOpTimeForCatchUp_inlock();
+ invariant(!_catchupState);
+ _catchupState = stdx::make_unique<CatchupState>(this);
+ _catchupState->start_inlock();
} else {
- _finishCatchingUpOplog_inlock();
+ _enterDrainMode_inlock();
}
break;
}
@@ -2786,13 +2792,114 @@ void ReplicationCoordinatorImpl::_performPostMemberStateUpdateAction(
}
}
+void ReplicationCoordinatorImpl::CatchupState::start_inlock() {
+ log() << "Entering primary catch-up mode.";
+
+ // No catchup in single node replica set.
+ if (_repl->_rsConfig.getNumMembers() == 1) {
+ abort_inlock();
+ return;
+ }
+
+ auto timeoutCB = [this](const CallbackArgs& cbData) {
+ if (!cbData.status.isOK()) {
+ return;
+ }
+ log() << "Catchup timed out after becoming primary.";
+ stdx::lock_guard<stdx::mutex> lk(_repl->_mutex);
+ abort_inlock();
+ };
+
+ // Schedule timeout callback.
+ auto catchupTimeout = _repl->_rsConfig.getCatchUpTimeoutPeriod();
+ // Deal with infinity and overflow - no timeout.
+ if (catchupTimeout == ReplSetConfig::kInfiniteCatchUpTimeout ||
+ Date_t::max() - _repl->_replExecutor->now() <= catchupTimeout) {
+ return;
+ }
+ auto timeoutDate = _repl->_replExecutor->now() + catchupTimeout;
+ auto status = _repl->_replExecutor->scheduleWorkAt(timeoutDate, timeoutCB);
+ if (!status.isOK()) {
+ log() << "Failed to schedule catchup timeout work.";
+ abort_inlock();
+ return;
+ }
+ _timeoutCbh = status.getValue();
+}
+
+void ReplicationCoordinatorImpl::CatchupState::abort_inlock() {
+ invariant(_repl->_getMemberState_inlock().primary());
+
+ log() << "Exited primary catch-up mode.";
+ // Clean up its own members.
+ if (_timeoutCbh) {
+ _repl->_replExecutor->cancel(_timeoutCbh);
+ }
+ if (_waiter) {
+ _repl->_opTimeWaiterList.remove_inlock(_waiter.get());
+ }
+
+ // Enter primary drain mode.
+ _repl->_enterDrainMode_inlock();
+ // Destruct the state itself.
+ _repl->_catchupState.reset(nullptr);
+}
+
+void ReplicationCoordinatorImpl::CatchupState::signalHeartbeatUpdate_inlock() {
+ auto targetOpTime = _repl->_topCoord->latestKnownOpTimeSinceHeartbeatRestart();
+ // Haven't collected all heartbeat responses.
+ if (!targetOpTime) {
+ return;
+ }
+
+ // We've caught up.
+ if (*targetOpTime <= _repl->_getMyLastAppliedOpTime_inlock()) {
+ log() << "Caught up to the latest known optime via heartbeats after becoming primary.";
+ abort_inlock();
+ return;
+ }
+
+ // Reset the target optime if it has changed.
+ if (_waiter && _waiter->opTime == *targetOpTime) {
+ return;
+ }
+
+ log() << "Heartbeats updated catchup target optime to " << *targetOpTime;
+ if (_waiter) {
+ _repl->_opTimeWaiterList.remove_inlock(_waiter.get());
+ }
+ auto targetOpTimeCB = [this, targetOpTime]() {
+ // Double check the target time since stepdown may signal us too.
+ if (*targetOpTime <= _repl->_getMyLastAppliedOpTime_inlock()) {
+ log() << "Caught up to the latest known optime successfully after becoming primary.";
+ abort_inlock();
+ }
+ };
+ _waiter = stdx::make_unique<CallbackWaiter>(*targetOpTime, targetOpTimeCB);
+ _repl->_opTimeWaiterList.add_inlock(_waiter.get());
+}
+
+Status ReplicationCoordinatorImpl::abortCatchupIfNeeded() {
+ if (!isV1ElectionProtocol()) {
+ return Status(ErrorCodes::CommandNotSupported,
+ "Primary catch-up is only supported by Protocol Version 1");
+ }
+
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (_catchupState) {
+ _catchupState->abort_inlock();
+ return Status::OK();
+ }
+ return Status(ErrorCodes::IllegalOperation, "The node is not in catch-up mode.");
+}
+
void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() {
auto scanner = std::make_shared<FreshnessScanner>();
auto scanStartTime = _replExecutor->now();
auto evhStatus = scanner->start(
_replExecutor.get(), _rsConfig, _selfIndex, _rsConfig.getCatchUpTimeoutPeriod());
if (evhStatus == ErrorCodes::ShutdownInProgress) {
- _finishCatchingUpOplog_inlock();
+ _enterDrainMode_inlock();
return;
}
fassertStatusOK(40254, evhStatus.getStatus());
@@ -2801,7 +2908,7 @@ void ReplicationCoordinatorImpl::_scanOpTimeForCatchUp_inlock() {
evhStatus.getValue(), [this, scanner, scanStartTime, term](const CallbackArgs& cbData) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
if (cbData.status == ErrorCodes::CallbackCanceled) {
- _finishCatchingUpOplog_inlock();
+ _enterDrainMode_inlock();
return;
}
auto totalTimeout = _rsConfig.getCatchUpTimeoutPeriod();
@@ -2830,7 +2937,7 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca
log() << "Could not access any nodes within timeout when checking for "
<< "additional ops to apply before finishing transition to primary. "
<< "Will move forward with becoming primary anyway.";
- _finishCatchingUpOplog_inlock();
+ _enterDrainMode_inlock();
return;
}
@@ -2839,7 +2946,7 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca
if (freshnessInfo.opTime <= _getMyLastAppliedOpTime_inlock()) {
log() << "My optime is most up-to-date, skipping catch-up "
<< "and completing transition to primary.";
- _finishCatchingUpOplog_inlock();
+ _enterDrainMode_inlock();
return;
}
@@ -2852,9 +2959,9 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca
log() << "Finished catch-up oplog after becoming primary.";
}
- _finishCatchingUpOplog_inlock();
+ _enterDrainMode_inlock();
};
- auto waiterInfo = std::make_shared<WaiterInfo>(freshnessInfo.opTime, finishCB);
+ auto waiterInfo = std::make_shared<CallbackWaiter>(freshnessInfo.opTime, finishCB);
_opTimeWaiterList.add_inlock(waiterInfo.get());
auto timeoutCB = [this, waiterInfo, finishCB](const CallbackArgs& cbData) {
@@ -2867,7 +2974,7 @@ void ReplicationCoordinatorImpl::_catchUpOplogToLatest_inlock(const FreshnessSca
_replExecutor->scheduleWorkAt(_replExecutor->now() + timeout, timeoutCB);
}
-void ReplicationCoordinatorImpl::_finishCatchingUpOplog_inlock() {
+void ReplicationCoordinatorImpl::_enterDrainMode_inlock() {
_applierState = ApplierState::Draining;
_externalState->stopProducer();
}
@@ -2958,7 +3065,7 @@ ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock(const ReplSetConfig& newC
}
void ReplicationCoordinatorImpl::_wakeReadyWaiters_inlock() {
- _replicationWaiterList.signalAndRemoveIf_inlock([this](WaiterInfo* waiter) {
+ _replicationWaiterList.signalAndRemoveIf_inlock([this](Waiter* waiter) {
return _doneWaitingForReplication_inlock(
waiter->opTime, SnapshotName::min(), *waiter->writeConcern);
});