diff options
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl.cpp')
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 4668 |
1 files changed, 2274 insertions, 2394 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index bd1378699ad..e3ba34932de 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -77,716 +77,683 @@ namespace mongo { namespace repl { namespace { - using executor::NetworkInterface; +using executor::NetworkInterface; - void lockAndCall(stdx::unique_lock<stdx::mutex>* lk, const stdx::function<void ()>& fn) { - if (!lk->owns_lock()) { - lk->lock(); - } - fn(); +void lockAndCall(stdx::unique_lock<stdx::mutex>* lk, const stdx::function<void()>& fn) { + if (!lk->owns_lock()) { + lk->lock(); } + fn(); +} - /** - * Implements the force-reconfig behavior of incrementing config version by a large random - * number. - */ - BSONObj incrementConfigVersionByRandom(BSONObj config) { - BSONObjBuilder builder; - for (BSONObjIterator iter(config); iter.more(); iter.next()) { - BSONElement elem = *iter; - if (elem.fieldNameStringData() == ReplicaSetConfig::kVersionFieldName && - elem.isNumber()) { - - std::unique_ptr<SecureRandom> generator(SecureRandom::create()); - const int random = std::abs(static_cast<int>(generator->nextInt64()) % 100000); - builder.appendIntOrLL(ReplicaSetConfig::kVersionFieldName, - elem.numberLong() + 10000 + random); - } - else { - builder.append(elem); - } - } - return builder.obj(); - } - -} //namespace - - struct ReplicationCoordinatorImpl::WaiterInfo { - - /** - * Constructor takes the list of waiters and enqueues itself on the list, removing itself - * in the destructor. - */ - WaiterInfo(std::vector<WaiterInfo*>* _list, - unsigned int _opID, - const OpTime* _opTime, - const WriteConcernOptions* _writeConcern, - stdx::condition_variable* _condVar) : list(_list), - master(true), - opID(_opID), - opTime(_opTime), - writeConcern(_writeConcern), - condVar(_condVar) { - list->push_back(this); - } - - ~WaiterInfo() { - list->erase(std::remove(list->begin(), list->end(), this), list->end()); - } - - std::vector<WaiterInfo*>* list; - bool master; // Set to false to indicate that stepDown was called while waiting - const unsigned int opID; - const OpTime* opTime; - const WriteConcernOptions* writeConcern; - stdx::condition_variable* condVar; - }; - -namespace { - ReplicationCoordinator::Mode getReplicationModeFromSettings(const ReplSettings& settings) { - if (settings.usingReplSets()) { - return ReplicationCoordinator::modeReplSet; - } - if (settings.master || settings.slave) { - return ReplicationCoordinator::modeMasterSlave; +/** + * Implements the force-reconfig behavior of incrementing config version by a large random + * number. + */ +BSONObj incrementConfigVersionByRandom(BSONObj config) { + BSONObjBuilder builder; + for (BSONObjIterator iter(config); iter.more(); iter.next()) { + BSONElement elem = *iter; + if (elem.fieldNameStringData() == ReplicaSetConfig::kVersionFieldName && elem.isNumber()) { + std::unique_ptr<SecureRandom> generator(SecureRandom::create()); + const int random = std::abs(static_cast<int>(generator->nextInt64()) % 100000); + builder.appendIntOrLL(ReplicaSetConfig::kVersionFieldName, + elem.numberLong() + 10000 + random); + } else { + builder.append(elem); } - return ReplicationCoordinator::modeNone; } -} // namespace - - ReplicationCoordinatorImpl::ReplicationCoordinatorImpl( - const ReplSettings& settings, - ReplicationCoordinatorExternalState* externalState, - TopologyCoordinator* topCoord, - int64_t prngSeed, - NetworkInterface* network, - StorageInterface* storage, - ReplicationExecutor* replExec) : - _settings(settings), - _replMode(getReplicationModeFromSettings(settings)), - _topCoord(topCoord), - _replExecutorIfOwned(replExec ? nullptr : - new ReplicationExecutor(network, - storage, - prngSeed)), - _replExecutor(replExec ? *replExec : *_replExecutorIfOwned), - _externalState(externalState), - _inShutdown(false), - _memberState(MemberState::RS_STARTUP), - _isWaitingForDrainToComplete(false), - _rsConfigState(kConfigPreStart), - _selfIndex(-1), - _sleptLastElection(false), - _canAcceptNonLocalWrites(!(settings.usingReplSets() || settings.slave)), - _canServeNonLocalReads(0U), - _dr(DataReplicatorOptions(), &_replExecutor, this) { - - if (!isReplEnabled()) { - return; - } + return builder.obj(); +} - std::unique_ptr<SecureRandom> rbidGenerator(SecureRandom::create()); - _rbid = static_cast<int>(rbidGenerator->nextInt64()); - if (_rbid < 0) { - // Ensure _rbid is always positive - _rbid = -_rbid; - } +} // namespace - // Make sure there is always an entry in _slaveInfo for ourself. - SlaveInfo selfInfo; - selfInfo.self = true; - _slaveInfo.push_back(selfInfo); - } - - ReplicationCoordinatorImpl::ReplicationCoordinatorImpl( - const ReplSettings& settings, - ReplicationCoordinatorExternalState* externalState, - NetworkInterface* network, - StorageInterface* storage, - TopologyCoordinator* topCoord, - int64_t prngSeed) : ReplicationCoordinatorImpl(settings, - externalState, - topCoord, - prngSeed, - network, - storage, - nullptr) { } - - ReplicationCoordinatorImpl::ReplicationCoordinatorImpl( - const ReplSettings& settings, - ReplicationCoordinatorExternalState* externalState, - TopologyCoordinator* topCoord, - ReplicationExecutor* replExec, - int64_t prngSeed) : ReplicationCoordinatorImpl(settings, - externalState, - topCoord, - prngSeed, - nullptr, - nullptr, - replExec) { } - - ReplicationCoordinatorImpl::~ReplicationCoordinatorImpl() {} - - void ReplicationCoordinatorImpl::waitForStartUpComplete() { - stdx::unique_lock<stdx::mutex> lk(_mutex); - while (_rsConfigState == kConfigPreStart || _rsConfigState == kConfigStartingUp) { - _rsConfigStateChange.wait(lk); - } - } +struct ReplicationCoordinatorImpl::WaiterInfo { + /** + * Constructor takes the list of waiters and enqueues itself on the list, removing itself + * in the destructor. + */ + WaiterInfo(std::vector<WaiterInfo*>* _list, + unsigned int _opID, + const OpTime* _opTime, + const WriteConcernOptions* _writeConcern, + stdx::condition_variable* _condVar) + : list(_list), + master(true), + opID(_opID), + opTime(_opTime), + writeConcern(_writeConcern), + condVar(_condVar) { + list->push_back(this); + } + + ~WaiterInfo() { + list->erase(std::remove(list->begin(), list->end(), this), list->end()); + } + + std::vector<WaiterInfo*>* list; + bool master; // Set to false to indicate that stepDown was called while waiting + const unsigned int opID; + const OpTime* opTime; + const WriteConcernOptions* writeConcern; + stdx::condition_variable* condVar; +}; - ReplicaSetConfig ReplicationCoordinatorImpl::getReplicaSetConfig_forTest() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - return _rsConfig; +namespace { +ReplicationCoordinator::Mode getReplicationModeFromSettings(const ReplSettings& settings) { + if (settings.usingReplSets()) { + return ReplicationCoordinator::modeReplSet; } - - void ReplicationCoordinatorImpl::_updateLastVote(const LastVote& lastVote) { - _topCoord->loadLastVote(lastVote); + if (settings.master || settings.slave) { + return ReplicationCoordinator::modeMasterSlave; } + return ReplicationCoordinator::modeNone; +} +} // namespace - bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* txn) { - - StatusWith<LastVote> lastVote = _externalState->loadLocalLastVoteDocument(txn); - if (!lastVote.isOK()) { - log() << "Did not find local voted for document at startup; " << lastVote.getStatus(); - } - else { - LastVote vote = lastVote.getValue(); - _replExecutor.scheduleWork( - stdx::bind(&ReplicationCoordinatorImpl::_updateLastVote, - this, - vote)); - } - - StatusWith<BSONObj> cfg = _externalState->loadLocalConfigDocument(txn); - if (!cfg.isOK()) { - log() << "Did not find local replica set configuration document at startup; " << - cfg.getStatus(); - return true; - } - ReplicaSetConfig localConfig; - Status status = localConfig.initialize(cfg.getValue()); - if (!status.isOK()) { - error() << "Locally stored replica set configuration does not parse; See " - "http://www.mongodb.org/dochub/core/recover-replica-set-from-invalid-config " - "for information on how to recover from this. Got \"" << - status << "\" while parsing " << cfg.getValue(); - fassertFailedNoTrace(28545); - } - - StatusWith<OpTime> lastOpTimeStatus = _externalState->loadLastOpTime(txn); - - // Use a callback here, because _finishLoadLocalConfig calls isself() which requires - // that the server's networking layer be up and running and accepting connections, which - // doesn't happen until startReplication finishes. +ReplicationCoordinatorImpl::ReplicationCoordinatorImpl( + const ReplSettings& settings, + ReplicationCoordinatorExternalState* externalState, + TopologyCoordinator* topCoord, + int64_t prngSeed, + NetworkInterface* network, + StorageInterface* storage, + ReplicationExecutor* replExec) + : _settings(settings), + _replMode(getReplicationModeFromSettings(settings)), + _topCoord(topCoord), + _replExecutorIfOwned(replExec ? nullptr + : new ReplicationExecutor(network, storage, prngSeed)), + _replExecutor(replExec ? *replExec : *_replExecutorIfOwned), + _externalState(externalState), + _inShutdown(false), + _memberState(MemberState::RS_STARTUP), + _isWaitingForDrainToComplete(false), + _rsConfigState(kConfigPreStart), + _selfIndex(-1), + _sleptLastElection(false), + _canAcceptNonLocalWrites(!(settings.usingReplSets() || settings.slave)), + _canServeNonLocalReads(0U), + _dr(DataReplicatorOptions(), &_replExecutor, this) { + if (!isReplEnabled()) { + return; + } + + std::unique_ptr<SecureRandom> rbidGenerator(SecureRandom::create()); + _rbid = static_cast<int>(rbidGenerator->nextInt64()); + if (_rbid < 0) { + // Ensure _rbid is always positive + _rbid = -_rbid; + } + + // Make sure there is always an entry in _slaveInfo for ourself. + SlaveInfo selfInfo; + selfInfo.self = true; + _slaveInfo.push_back(selfInfo); +} + +ReplicationCoordinatorImpl::ReplicationCoordinatorImpl( + const ReplSettings& settings, + ReplicationCoordinatorExternalState* externalState, + NetworkInterface* network, + StorageInterface* storage, + TopologyCoordinator* topCoord, + int64_t prngSeed) + : ReplicationCoordinatorImpl( + settings, externalState, topCoord, prngSeed, network, storage, nullptr) {} + +ReplicationCoordinatorImpl::ReplicationCoordinatorImpl( + const ReplSettings& settings, + ReplicationCoordinatorExternalState* externalState, + TopologyCoordinator* topCoord, + ReplicationExecutor* replExec, + int64_t prngSeed) + : ReplicationCoordinatorImpl( + settings, externalState, topCoord, prngSeed, nullptr, nullptr, replExec) {} + +ReplicationCoordinatorImpl::~ReplicationCoordinatorImpl() {} + +void ReplicationCoordinatorImpl::waitForStartUpComplete() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + while (_rsConfigState == kConfigPreStart || _rsConfigState == kConfigStartingUp) { + _rsConfigStateChange.wait(lk); + } +} + +ReplicaSetConfig ReplicationCoordinatorImpl::getReplicaSetConfig_forTest() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _rsConfig; +} + +void ReplicationCoordinatorImpl::_updateLastVote(const LastVote& lastVote) { + _topCoord->loadLastVote(lastVote); +} + +bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* txn) { + StatusWith<LastVote> lastVote = _externalState->loadLocalLastVoteDocument(txn); + if (!lastVote.isOK()) { + log() << "Did not find local voted for document at startup; " << lastVote.getStatus(); + } else { + LastVote vote = lastVote.getValue(); _replExecutor.scheduleWork( - stdx::bind(&ReplicationCoordinatorImpl::_finishLoadLocalConfig, - this, - stdx::placeholders::_1, - localConfig, - lastOpTimeStatus)); - return false; + stdx::bind(&ReplicationCoordinatorImpl::_updateLastVote, this, vote)); } - void ReplicationCoordinatorImpl::_finishLoadLocalConfig( - const ReplicationExecutor::CallbackArgs& cbData, - const ReplicaSetConfig& localConfig, - const StatusWith<OpTime>& lastOpTimeStatus) { - if (!cbData.status.isOK()) { - LOG(1) << "Loading local replica set configuration failed due to " << cbData.status; - return; - } - - StatusWith<int> myIndex = validateConfigForStartUp(_externalState.get(), - _rsConfig, - localConfig); - if (!myIndex.isOK()) { - if (myIndex.getStatus() == ErrorCodes::NodeNotFound || - myIndex.getStatus() == ErrorCodes::DuplicateKey) { - warning() << "Locally stored replica set configuration does not have a valid entry " - "for the current node; waiting for reconfig or remote heartbeat; Got \"" << - myIndex.getStatus() << "\" while validating " << localConfig.toBSON(); - myIndex = StatusWith<int>(-1); - } - else { - error() << "Locally stored replica set configuration is invalid; See " - "http://www.mongodb.org/dochub/core/recover-replica-set-from-invalid-config" - " for information on how to recover from this. Got \"" << - myIndex.getStatus() << "\" while validating " << localConfig.toBSON(); - fassertFailedNoTrace(28544); - } - } - - if (localConfig.getReplSetName() != _settings.ourSetName()) { - warning() << "Local replica set configuration document reports set name of " << - localConfig.getReplSetName() << ", but command line reports " << - _settings.ourSetName() << "; waitng for reconfig or remote heartbeat"; + StatusWith<BSONObj> cfg = _externalState->loadLocalConfigDocument(txn); + if (!cfg.isOK()) { + log() << "Did not find local replica set configuration document at startup; " + << cfg.getStatus(); + return true; + } + ReplicaSetConfig localConfig; + Status status = localConfig.initialize(cfg.getValue()); + if (!status.isOK()) { + error() << "Locally stored replica set configuration does not parse; See " + "http://www.mongodb.org/dochub/core/recover-replica-set-from-invalid-config " + "for information on how to recover from this. Got \"" << status + << "\" while parsing " << cfg.getValue(); + fassertFailedNoTrace(28545); + } + + StatusWith<OpTime> lastOpTimeStatus = _externalState->loadLastOpTime(txn); + + // Use a callback here, because _finishLoadLocalConfig calls isself() which requires + // that the server's networking layer be up and running and accepting connections, which + // doesn't happen until startReplication finishes. + _replExecutor.scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_finishLoadLocalConfig, + this, + stdx::placeholders::_1, + localConfig, + lastOpTimeStatus)); + return false; +} + +void ReplicationCoordinatorImpl::_finishLoadLocalConfig( + const ReplicationExecutor::CallbackArgs& cbData, + const ReplicaSetConfig& localConfig, + const StatusWith<OpTime>& lastOpTimeStatus) { + if (!cbData.status.isOK()) { + LOG(1) << "Loading local replica set configuration failed due to " << cbData.status; + return; + } + + StatusWith<int> myIndex = + validateConfigForStartUp(_externalState.get(), _rsConfig, localConfig); + if (!myIndex.isOK()) { + if (myIndex.getStatus() == ErrorCodes::NodeNotFound || + myIndex.getStatus() == ErrorCodes::DuplicateKey) { + warning() << "Locally stored replica set configuration does not have a valid entry " + "for the current node; waiting for reconfig or remote heartbeat; Got \"" + << myIndex.getStatus() << "\" while validating " << localConfig.toBSON(); myIndex = StatusWith<int>(-1); + } else { + error() << "Locally stored replica set configuration is invalid; See " + "http://www.mongodb.org/dochub/core/recover-replica-set-from-invalid-config" + " for information on how to recover from this. Got \"" << myIndex.getStatus() + << "\" while validating " << localConfig.toBSON(); + fassertFailedNoTrace(28544); } - - // Do not check optime, if this node is an arbiter. - bool isArbiter = myIndex.getValue() != -1 && - localConfig.getMemberAt(myIndex.getValue()).isArbiter(); - OpTime lastOpTime; - if (!isArbiter) { - if (!lastOpTimeStatus.isOK()) { - warning() << "Failed to load timestamp of most recently applied operation; " << - lastOpTimeStatus.getStatus(); - } - else { - lastOpTime = lastOpTimeStatus.getValue(); - } - } - - stdx::unique_lock<stdx::mutex> lk(_mutex); - invariant(_rsConfigState == kConfigStartingUp); - const PostMemberStateUpdateAction action = - _setCurrentRSConfig_inlock(localConfig, myIndex.getValue()); - _setMyLastOptime_inlock(&lk, lastOpTime, false); - _externalState->setGlobalTimestamp(lastOpTime.getTimestamp()); - if (lk.owns_lock()) { - lk.unlock(); - } - _performPostMemberStateUpdateAction(action); - _externalState->startThreads(); } - void ReplicationCoordinatorImpl::startReplication(OperationContext* txn) { - if (!isReplEnabled()) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - _setConfigState_inlock(kConfigReplicationDisabled); - return; - } - - { - OID rid = _externalState->ensureMe(txn); - - stdx::lock_guard<stdx::mutex> lk(_mutex); - fassert(18822, !_inShutdown); - _setConfigState_inlock(kConfigStartingUp); - _myRID = rid; - _slaveInfo[_getMyIndexInSlaveInfo_inlock()].rid = rid; - } - - if (!_settings.usingReplSets()) { - // Must be Master/Slave - invariant(_settings.master || _settings.slave); - _externalState->startMasterSlave(txn); - return; - } - - _topCoordDriverThread.reset(new stdx::thread(stdx::bind(&ReplicationExecutor::run, - &_replExecutor))); + if (localConfig.getReplSetName() != _settings.ourSetName()) { + warning() << "Local replica set configuration document reports set name of " + << localConfig.getReplSetName() << ", but command line reports " + << _settings.ourSetName() << "; waitng for reconfig or remote heartbeat"; + myIndex = StatusWith<int>(-1); + } - bool doneLoadingConfig = _startLoadLocalConfig(txn); - if (doneLoadingConfig) { - // If we're not done loading the config, then the config state will be set by - // _finishLoadLocalConfig. - stdx::lock_guard<stdx::mutex> lk(_mutex); - invariant(!_rsConfig.isInitialized()); - _setConfigState_inlock(kConfigUninitialized); + // Do not check optime, if this node is an arbiter. + bool isArbiter = + myIndex.getValue() != -1 && localConfig.getMemberAt(myIndex.getValue()).isArbiter(); + OpTime lastOpTime; + if (!isArbiter) { + if (!lastOpTimeStatus.isOK()) { + warning() << "Failed to load timestamp of most recently applied operation; " + << lastOpTimeStatus.getStatus(); + } else { + lastOpTime = lastOpTimeStatus.getValue(); } } - void ReplicationCoordinatorImpl::shutdown() { - // Shutdown must: - // * prevent new threads from blocking in awaitReplication - // * wake up all existing threads blocking in awaitReplication - // * tell the ReplicationExecutor to shut down - // * wait for the thread running the ReplicationExecutor to finish + stdx::unique_lock<stdx::mutex> lk(_mutex); + invariant(_rsConfigState == kConfigStartingUp); + const PostMemberStateUpdateAction action = + _setCurrentRSConfig_inlock(localConfig, myIndex.getValue()); + _setMyLastOptime_inlock(&lk, lastOpTime, false); + _externalState->setGlobalTimestamp(lastOpTime.getTimestamp()); + if (lk.owns_lock()) { + lk.unlock(); + } + _performPostMemberStateUpdateAction(action); + _externalState->startThreads(); +} - if (!_settings.usingReplSets()) { - return; - } +void ReplicationCoordinatorImpl::startReplication(OperationContext* txn) { + if (!isReplEnabled()) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _setConfigState_inlock(kConfigReplicationDisabled); + return; + } - { - stdx::lock_guard<stdx::mutex> lk(_mutex); - fassert(28533, !_inShutdown); - _inShutdown = true; - if (_rsConfigState == kConfigPreStart) { - warning() << "ReplicationCoordinatorImpl::shutdown() called before " - "startReplication() finished. Shutting down without cleaning up the " - "replication system"; - return; - } - fassert(18823, _rsConfigState != kConfigStartingUp); - for (std::vector<WaiterInfo*>::iterator it = _replicationWaiterList.begin(); - it != _replicationWaiterList.end(); ++it) { - WaiterInfo* waiter = *it; - waiter->condVar->notify_all(); - } - } + { + OID rid = _externalState->ensureMe(txn); - _replExecutor.shutdown(); - _topCoordDriverThread->join(); // must happen outside _mutex - _externalState->shutdown(); + stdx::lock_guard<stdx::mutex> lk(_mutex); + fassert(18822, !_inShutdown); + _setConfigState_inlock(kConfigStartingUp); + _myRID = rid; + _slaveInfo[_getMyIndexInSlaveInfo_inlock()].rid = rid; } - const ReplSettings& ReplicationCoordinatorImpl::getSettings() const { - return _settings; + if (!_settings.usingReplSets()) { + // Must be Master/Slave + invariant(_settings.master || _settings.slave); + _externalState->startMasterSlave(txn); + return; } - ReplicationCoordinator::Mode ReplicationCoordinatorImpl::getReplicationMode() const { - return _replMode; - } + _topCoordDriverThread.reset( + new stdx::thread(stdx::bind(&ReplicationExecutor::run, &_replExecutor))); - MemberState ReplicationCoordinatorImpl::getMemberState() const { + bool doneLoadingConfig = _startLoadLocalConfig(txn); + if (doneLoadingConfig) { + // If we're not done loading the config, then the config state will be set by + // _finishLoadLocalConfig. stdx::lock_guard<stdx::mutex> lk(_mutex); - return _getMemberState_inlock(); + invariant(!_rsConfig.isInitialized()); + _setConfigState_inlock(kConfigUninitialized); } +} - MemberState ReplicationCoordinatorImpl::_getMemberState_inlock() const { - return _memberState; +void ReplicationCoordinatorImpl::shutdown() { + // Shutdown must: + // * prevent new threads from blocking in awaitReplication + // * wake up all existing threads blocking in awaitReplication + // * tell the ReplicationExecutor to shut down + // * wait for the thread running the ReplicationExecutor to finish + + if (!_settings.usingReplSets()) { + return; } - Seconds ReplicationCoordinatorImpl::getSlaveDelaySecs() const { + { stdx::lock_guard<stdx::mutex> lk(_mutex); - invariant(_rsConfig.isInitialized()); - uassert(28524, - "Node not a member of the current set configuration", - _selfIndex != -1); - return _rsConfig.getMemberAt(_selfIndex).getSlaveDelay(); - } - - void ReplicationCoordinatorImpl::clearSyncSourceBlacklist() { - CBHStatus cbh = _replExecutor.scheduleWork( - stdx::bind(&ReplicationCoordinatorImpl::_clearSyncSourceBlacklist_finish, - this, - stdx::placeholders::_1)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { + fassert(28533, !_inShutdown); + _inShutdown = true; + if (_rsConfigState == kConfigPreStart) { + warning() << "ReplicationCoordinatorImpl::shutdown() called before " + "startReplication() finished. Shutting down without cleaning up the " + "replication system"; return; } - fassert(18907, cbh.getStatus()); - _replExecutor.wait(cbh.getValue()); + fassert(18823, _rsConfigState != kConfigStartingUp); + for (std::vector<WaiterInfo*>::iterator it = _replicationWaiterList.begin(); + it != _replicationWaiterList.end(); + ++it) { + WaiterInfo* waiter = *it; + waiter->condVar->notify_all(); + } + } + + _replExecutor.shutdown(); + _topCoordDriverThread->join(); // must happen outside _mutex + _externalState->shutdown(); +} + +const ReplSettings& ReplicationCoordinatorImpl::getSettings() const { + return _settings; +} + +ReplicationCoordinator::Mode ReplicationCoordinatorImpl::getReplicationMode() const { + return _replMode; +} + +MemberState ReplicationCoordinatorImpl::getMemberState() const { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _getMemberState_inlock(); +} + +MemberState ReplicationCoordinatorImpl::_getMemberState_inlock() const { + return _memberState; +} + +Seconds ReplicationCoordinatorImpl::getSlaveDelaySecs() const { + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(_rsConfig.isInitialized()); + uassert(28524, "Node not a member of the current set configuration", _selfIndex != -1); + return _rsConfig.getMemberAt(_selfIndex).getSlaveDelay(); +} + +void ReplicationCoordinatorImpl::clearSyncSourceBlacklist() { + CBHStatus cbh = _replExecutor.scheduleWork( + stdx::bind(&ReplicationCoordinatorImpl::_clearSyncSourceBlacklist_finish, + this, + stdx::placeholders::_1)); + if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { + return; + } + fassert(18907, cbh.getStatus()); + _replExecutor.wait(cbh.getValue()); +} + +void ReplicationCoordinatorImpl::_clearSyncSourceBlacklist_finish( + const ReplicationExecutor::CallbackArgs& cbData) { + if (cbData.status == ErrorCodes::CallbackCanceled) + return; + _topCoord->clearSyncSourceBlacklist(); +} + +bool ReplicationCoordinatorImpl::setFollowerMode(const MemberState& newState) { + StatusWith<ReplicationExecutor::EventHandle> finishedSettingFollowerMode = + _replExecutor.makeEvent(); + if (finishedSettingFollowerMode.getStatus() == ErrorCodes::ShutdownInProgress) { + return false; } - - void ReplicationCoordinatorImpl::_clearSyncSourceBlacklist_finish( - const ReplicationExecutor::CallbackArgs& cbData) { - if (cbData.status == ErrorCodes::CallbackCanceled) - return; - _topCoord->clearSyncSourceBlacklist(); + fassert(18812, finishedSettingFollowerMode.getStatus()); + bool success = false; + CBHStatus cbh = + _replExecutor.scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_setFollowerModeFinish, + this, + stdx::placeholders::_1, + newState, + finishedSettingFollowerMode.getValue(), + &success)); + if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { + return false; } + fassert(18699, cbh.getStatus()); + _replExecutor.waitForEvent(finishedSettingFollowerMode.getValue()); + return success; +} - bool ReplicationCoordinatorImpl::setFollowerMode(const MemberState& newState) { - StatusWith<ReplicationExecutor::EventHandle> finishedSettingFollowerMode = - _replExecutor.makeEvent(); - if (finishedSettingFollowerMode.getStatus() == ErrorCodes::ShutdownInProgress) { - return false; - } - fassert(18812, finishedSettingFollowerMode.getStatus()); - bool success = false; - CBHStatus cbh = _replExecutor.scheduleWork( - stdx::bind(&ReplicationCoordinatorImpl::_setFollowerModeFinish, - this, - stdx::placeholders::_1, - newState, - finishedSettingFollowerMode.getValue(), - &success)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return false; - } - fassert(18699, cbh.getStatus()); - _replExecutor.waitForEvent(finishedSettingFollowerMode.getValue()); - return success; +void ReplicationCoordinatorImpl::_setFollowerModeFinish( + const ReplicationExecutor::CallbackArgs& cbData, + const MemberState& newState, + const ReplicationExecutor::EventHandle& finishedSettingFollowerMode, + bool* success) { + if (cbData.status == ErrorCodes::CallbackCanceled) { + return; } - - void ReplicationCoordinatorImpl::_setFollowerModeFinish( - const ReplicationExecutor::CallbackArgs& cbData, - const MemberState& newState, - const ReplicationExecutor::EventHandle& finishedSettingFollowerMode, - bool* success) { - - if (cbData.status == ErrorCodes::CallbackCanceled) { - return; - } - if (newState == _topCoord->getMemberState()) { - *success = true; - _replExecutor.signalEvent(finishedSettingFollowerMode); - return; - } - if (_topCoord->getRole() == TopologyCoordinator::Role::leader) { - *success = false; - _replExecutor.signalEvent(finishedSettingFollowerMode); - return; - } - - if (_topCoord->getRole() == TopologyCoordinator::Role::candidate) { - // We are a candidate, which means _topCoord believs us to be in state RS_SECONDARY, and - // we know that newState != RS_SECONDARY because we would have returned early, above if - // the old and new state were equal. So, cancel the running election and try again to - // finish setting the follower mode. - invariant(_freshnessChecker); - _freshnessChecker->cancel(&_replExecutor); - if (_electCmdRunner) { - _electCmdRunner->cancel(&_replExecutor); - } - _replExecutor.onEvent( - _electionFinishedEvent, - stdx::bind(&ReplicationCoordinatorImpl::_setFollowerModeFinish, - this, - stdx::placeholders::_1, - newState, - finishedSettingFollowerMode, - success)); - return; - } - - stdx::unique_lock<stdx::mutex> lk(_mutex); - _topCoord->setFollowerMode(newState.s); - - const PostMemberStateUpdateAction action = - _updateMemberStateFromTopologyCoordinator_inlock(); - lk.unlock(); - _performPostMemberStateUpdateAction(action); + if (newState == _topCoord->getMemberState()) { *success = true; _replExecutor.signalEvent(finishedSettingFollowerMode); + return; } - - bool ReplicationCoordinatorImpl::isWaitingForApplierToDrain() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - return _isWaitingForDrainToComplete; - } - - void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) { - // This logic is a little complicated in order to avoid acquiring the global exclusive lock - // unnecessarily. This is important because the applier may call signalDrainComplete() - // whenever it wants, not only when the ReplicationCoordinator is expecting it. - // - // The steps are: - // 1.) Check to see if we're waiting for this signal. If not, return early. - // 2.) Otherwise, release the mutex while acquiring the global exclusive lock, - // since that might take a while (NB there's a deadlock cycle otherwise, too). - // 3.) Re-check to see if we've somehow left drain mode. If we have not, clear - // _isWaitingForDrainToComplete, set the flag allowing non-local database writes and - // drop the mutex. At this point, no writes can occur from other threads, due to the - // global exclusive lock. - // 4.) Drop all temp collections. - // 5.) Drop the global exclusive lock. - // - // Because replicatable writes are forbidden while in drain mode, and we don't exit drain - // mode until we have the global exclusive lock, which forbids all other threads from making - // writes, we know that from the time that _isWaitingForDrainToComplete is set in - // _performPostMemberStateUpdateAction(kActionWinElection) until this method returns, no - // external writes will be processed. This is important so that a new temp collection isn't - // introduced on the new primary before we drop all the temp collections. - - stdx::unique_lock<stdx::mutex> lk(_mutex); - if (!_isWaitingForDrainToComplete) { - return; - } - lk.unlock(); - ScopedTransaction transaction(txn, MODE_X); - Lock::GlobalWrite globalWriteLock(txn->lockState()); - lk.lock(); - if (!_isWaitingForDrainToComplete) { - return; + if (_topCoord->getRole() == TopologyCoordinator::Role::leader) { + *success = false; + _replExecutor.signalEvent(finishedSettingFollowerMode); + return; + } + + if (_topCoord->getRole() == TopologyCoordinator::Role::candidate) { + // We are a candidate, which means _topCoord believs us to be in state RS_SECONDARY, and + // we know that newState != RS_SECONDARY because we would have returned early, above if + // the old and new state were equal. So, cancel the running election and try again to + // finish setting the follower mode. + invariant(_freshnessChecker); + _freshnessChecker->cancel(&_replExecutor); + if (_electCmdRunner) { + _electCmdRunner->cancel(&_replExecutor); + } + _replExecutor.onEvent(_electionFinishedEvent, + stdx::bind(&ReplicationCoordinatorImpl::_setFollowerModeFinish, + this, + stdx::placeholders::_1, + newState, + finishedSettingFollowerMode, + success)); + return; + } + + stdx::unique_lock<stdx::mutex> lk(_mutex); + _topCoord->setFollowerMode(newState.s); + + const PostMemberStateUpdateAction action = _updateMemberStateFromTopologyCoordinator_inlock(); + lk.unlock(); + _performPostMemberStateUpdateAction(action); + *success = true; + _replExecutor.signalEvent(finishedSettingFollowerMode); +} + +bool ReplicationCoordinatorImpl::isWaitingForApplierToDrain() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _isWaitingForDrainToComplete; +} + +void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) { + // This logic is a little complicated in order to avoid acquiring the global exclusive lock + // unnecessarily. This is important because the applier may call signalDrainComplete() + // whenever it wants, not only when the ReplicationCoordinator is expecting it. + // + // The steps are: + // 1.) Check to see if we're waiting for this signal. If not, return early. + // 2.) Otherwise, release the mutex while acquiring the global exclusive lock, + // since that might take a while (NB there's a deadlock cycle otherwise, too). + // 3.) Re-check to see if we've somehow left drain mode. If we have not, clear + // _isWaitingForDrainToComplete, set the flag allowing non-local database writes and + // drop the mutex. At this point, no writes can occur from other threads, due to the + // global exclusive lock. + // 4.) Drop all temp collections. + // 5.) Drop the global exclusive lock. + // + // Because replicatable writes are forbidden while in drain mode, and we don't exit drain + // mode until we have the global exclusive lock, which forbids all other threads from making + // writes, we know that from the time that _isWaitingForDrainToComplete is set in + // _performPostMemberStateUpdateAction(kActionWinElection) until this method returns, no + // external writes will be processed. This is important so that a new temp collection isn't + // introduced on the new primary before we drop all the temp collections. + + stdx::unique_lock<stdx::mutex> lk(_mutex); + if (!_isWaitingForDrainToComplete) { + return; + } + lk.unlock(); + ScopedTransaction transaction(txn, MODE_X); + Lock::GlobalWrite globalWriteLock(txn->lockState()); + lk.lock(); + if (!_isWaitingForDrainToComplete) { + return; + } + _isWaitingForDrainToComplete = false; + _canAcceptNonLocalWrites = true; + lk.unlock(); + _externalState->dropAllTempCollections(txn); + log() << "transition to primary complete; database writes are now permitted" << rsLog; +} + +void ReplicationCoordinatorImpl::signalUpstreamUpdater() { + _externalState->forwardSlaveProgress(); +} + +ReplicationCoordinatorImpl::SlaveInfo* ReplicationCoordinatorImpl::_findSlaveInfoByMemberID_inlock( + int memberId) { + for (SlaveInfoVector::iterator it = _slaveInfo.begin(); it != _slaveInfo.end(); ++it) { + if (it->memberId == memberId) { + return &(*it); + } + } + return NULL; +} + +ReplicationCoordinatorImpl::SlaveInfo* ReplicationCoordinatorImpl::_findSlaveInfoByRID_inlock( + const OID& rid) { + for (SlaveInfoVector::iterator it = _slaveInfo.begin(); it != _slaveInfo.end(); ++it) { + if (it->rid == rid) { + return &(*it); + } + } + return NULL; +} + +void ReplicationCoordinatorImpl::_addSlaveInfo_inlock(const SlaveInfo& slaveInfo) { + invariant(getReplicationMode() == modeMasterSlave); + _slaveInfo.push_back(slaveInfo); + + // Wake up any threads waiting for replication that now have their replication + // check satisfied + _wakeReadyWaiters_inlock(); +} + +void ReplicationCoordinatorImpl::_updateSlaveInfoOptime_inlock(SlaveInfo* slaveInfo, + const OpTime& opTime) { + slaveInfo->opTime = opTime; + + // Wake up any threads waiting for replication that now have their replication + // check satisfied + _wakeReadyWaiters_inlock(); +} + +void ReplicationCoordinatorImpl::_updateSlaveInfoFromConfig_inlock() { + invariant(_settings.usingReplSets()); + + SlaveInfoVector oldSlaveInfos; + _slaveInfo.swap(oldSlaveInfos); + + if (_selfIndex == -1) { + // If we aren't in the config then the only data we care about is for ourself + for (SlaveInfoVector::const_iterator it = oldSlaveInfos.begin(); it != oldSlaveInfos.end(); + ++it) { + if (it->self) { + SlaveInfo slaveInfo = *it; + slaveInfo.memberId = -1; + _slaveInfo.push_back(slaveInfo); + return; + } } - _isWaitingForDrainToComplete = false; - _canAcceptNonLocalWrites = true; - lk.unlock(); - _externalState->dropAllTempCollections(txn); - log() << "transition to primary complete; database writes are now permitted" << rsLog; + invariant(false); // There should always have been an entry for ourself } - void ReplicationCoordinatorImpl::signalUpstreamUpdater() { - _externalState->forwardSlaveProgress(); - } + for (int i = 0; i < _rsConfig.getNumMembers(); ++i) { + const MemberConfig& memberConfig = _rsConfig.getMemberAt(i); + int memberId = memberConfig.getId(); + const HostAndPort& memberHostAndPort = memberConfig.getHostAndPort(); - ReplicationCoordinatorImpl::SlaveInfo* - ReplicationCoordinatorImpl::_findSlaveInfoByMemberID_inlock(int memberId) { - for (SlaveInfoVector::iterator it = _slaveInfo.begin(); it != _slaveInfo.end(); ++it) { - if (it->memberId == memberId) { - return &(*it); - } - } - return NULL; - } + SlaveInfo slaveInfo; - ReplicationCoordinatorImpl::SlaveInfo* - ReplicationCoordinatorImpl::_findSlaveInfoByRID_inlock(const OID& rid) { - for (SlaveInfoVector::iterator it = _slaveInfo.begin(); it != _slaveInfo.end(); ++it) { - if (it->rid == rid) { - return &(*it); + // Check if the node existed with the same member ID and hostname in the old data + for (SlaveInfoVector::const_iterator it = oldSlaveInfos.begin(); it != oldSlaveInfos.end(); + ++it) { + if ((it->memberId == memberId && it->hostAndPort == memberHostAndPort) || + (i == _selfIndex && it->self)) { + slaveInfo = *it; } } - return NULL; - } - void ReplicationCoordinatorImpl::_addSlaveInfo_inlock(const SlaveInfo& slaveInfo) { - invariant(getReplicationMode() == modeMasterSlave); + // Make sure you have the most up-to-date info for member ID and hostAndPort. + slaveInfo.memberId = memberId; + slaveInfo.hostAndPort = memberHostAndPort; _slaveInfo.push_back(slaveInfo); - - // Wake up any threads waiting for replication that now have their replication - // check satisfied - _wakeReadyWaiters_inlock(); } + invariant(static_cast<int>(_slaveInfo.size()) == _rsConfig.getNumMembers()); +} - void ReplicationCoordinatorImpl::_updateSlaveInfoOptime_inlock(SlaveInfo* slaveInfo, - const OpTime& opTime) { - - slaveInfo->opTime = opTime; - - // Wake up any threads waiting for replication that now have their replication - // check satisfied - _wakeReadyWaiters_inlock(); - } - - void ReplicationCoordinatorImpl::_updateSlaveInfoFromConfig_inlock() { +size_t ReplicationCoordinatorImpl::_getMyIndexInSlaveInfo_inlock() const { + if (getReplicationMode() == modeMasterSlave) { + // Self data always lives in the first entry in _slaveInfo for master/slave + return 0; + } else { invariant(_settings.usingReplSets()); - - SlaveInfoVector oldSlaveInfos; - _slaveInfo.swap(oldSlaveInfos); - if (_selfIndex == -1) { - // If we aren't in the config then the only data we care about is for ourself - for (SlaveInfoVector::const_iterator it = oldSlaveInfos.begin(); - it != oldSlaveInfos.end(); ++it) { - if (it->self) { - SlaveInfo slaveInfo = *it; - slaveInfo.memberId = -1; - _slaveInfo.push_back(slaveInfo); - return; - } - } - invariant(false); // There should always have been an entry for ourself - } - - for (int i = 0; i < _rsConfig.getNumMembers(); ++i) { - const MemberConfig& memberConfig = _rsConfig.getMemberAt(i); - int memberId = memberConfig.getId(); - const HostAndPort& memberHostAndPort = memberConfig.getHostAndPort(); - - SlaveInfo slaveInfo; - - // Check if the node existed with the same member ID and hostname in the old data - for (SlaveInfoVector::const_iterator it = oldSlaveInfos.begin(); - it != oldSlaveInfos.end(); ++it) { - if ((it->memberId == memberId && it->hostAndPort == memberHostAndPort) - || (i == _selfIndex && it->self)) { - slaveInfo = *it; - } - } - - // Make sure you have the most up-to-date info for member ID and hostAndPort. - slaveInfo.memberId = memberId; - slaveInfo.hostAndPort = memberHostAndPort; - _slaveInfo.push_back(slaveInfo); - } - invariant(static_cast<int>(_slaveInfo.size()) == _rsConfig.getNumMembers()); - } - - size_t ReplicationCoordinatorImpl::_getMyIndexInSlaveInfo_inlock() const { - if (getReplicationMode() == modeMasterSlave) { - // Self data always lives in the first entry in _slaveInfo for master/slave + invariant(_slaveInfo.size() == 1); return 0; - } - else { - invariant(_settings.usingReplSets()); - if (_selfIndex == -1) { - invariant(_slaveInfo.size() == 1); - return 0; - } - else { - return _selfIndex; - } + } else { + return _selfIndex; } } +} - Status ReplicationCoordinatorImpl::setLastOptimeForSlave(const OID& rid, - const Timestamp& ts) { - stdx::unique_lock<stdx::mutex> lock(_mutex); - massert(28576, - "Received an old style replication progress update, which is only used for Master/" - "Slave replication now, but this node is not using Master/Slave replication. " - "This is likely caused by an old (pre-2.6) member syncing from this node.", - getReplicationMode() == modeMasterSlave); - - // Term == 0 for master-slave - OpTime opTime(ts, OpTime::kDefaultTerm); - SlaveInfo* slaveInfo = _findSlaveInfoByRID_inlock(rid); - if (slaveInfo) { - if (slaveInfo->opTime < opTime) { - _updateSlaveInfoOptime_inlock(slaveInfo, opTime); - } - } - else { - SlaveInfo newSlaveInfo; - newSlaveInfo.rid = rid; - newSlaveInfo.opTime = opTime; - _addSlaveInfo_inlock(newSlaveInfo); - } - return Status::OK(); - } +Status ReplicationCoordinatorImpl::setLastOptimeForSlave(const OID& rid, const Timestamp& ts) { + stdx::unique_lock<stdx::mutex> lock(_mutex); + massert(28576, + "Received an old style replication progress update, which is only used for Master/" + "Slave replication now, but this node is not using Master/Slave replication. " + "This is likely caused by an old (pre-2.6) member syncing from this node.", + getReplicationMode() == modeMasterSlave); - void ReplicationCoordinatorImpl::setMyHeartbeatMessage(const std::string& msg) { - CBHStatus cbh = _replExecutor.scheduleWork( - stdx::bind(&TopologyCoordinator::setMyHeartbeatMessage, - _topCoord.get(), - _replExecutor.now(), - msg)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return; + // Term == 0 for master-slave + OpTime opTime(ts, OpTime::kDefaultTerm); + SlaveInfo* slaveInfo = _findSlaveInfoByRID_inlock(rid); + if (slaveInfo) { + if (slaveInfo->opTime < opTime) { + _updateSlaveInfoOptime_inlock(slaveInfo, opTime); + } + } else { + SlaveInfo newSlaveInfo; + newSlaveInfo.rid = rid; + newSlaveInfo.opTime = opTime; + _addSlaveInfo_inlock(newSlaveInfo); + } + return Status::OK(); +} + +void ReplicationCoordinatorImpl::setMyHeartbeatMessage(const std::string& msg) { + CBHStatus cbh = _replExecutor.scheduleWork(stdx::bind( + &TopologyCoordinator::setMyHeartbeatMessage, _topCoord.get(), _replExecutor.now(), msg)); + if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { + return; + } + fassert(28540, cbh.getStatus()); + _replExecutor.wait(cbh.getValue()); +} + +void ReplicationCoordinatorImpl::setMyLastOptime(const OpTime& opTime) { + stdx::unique_lock<stdx::mutex> lock(_mutex); + _setMyLastOptime_inlock(&lock, opTime, false); +} + +void ReplicationCoordinatorImpl::resetMyLastOptime() { + stdx::unique_lock<stdx::mutex> lock(_mutex); + // Reset to uninitialized OpTime + _setMyLastOptime_inlock(&lock, OpTime(), true); +} + +void ReplicationCoordinatorImpl::_setMyLastOptime_inlock(stdx::unique_lock<stdx::mutex>* lock, + const OpTime& opTime, + bool isRollbackAllowed) { + invariant(lock->owns_lock()); + SlaveInfo* mySlaveInfo = &_slaveInfo[_getMyIndexInSlaveInfo_inlock()]; + invariant(isRollbackAllowed || mySlaveInfo->opTime <= opTime); + _updateSlaveInfoOptime_inlock(mySlaveInfo, opTime); + + if (getReplicationMode() != modeReplSet) { + return; + } + + for (auto& opTimeWaiter : _opTimeWaiterList) { + if (*(opTimeWaiter->opTime) <= opTime) { + opTimeWaiter->condVar->notify_all(); } - fassert(28540, cbh.getStatus()); - _replExecutor.wait(cbh.getValue()); - } - - void ReplicationCoordinatorImpl::setMyLastOptime(const OpTime& opTime) { - stdx::unique_lock<stdx::mutex> lock(_mutex); - _setMyLastOptime_inlock(&lock, opTime, false); } - void ReplicationCoordinatorImpl::resetMyLastOptime() { - stdx::unique_lock<stdx::mutex> lock(_mutex); - // Reset to uninitialized OpTime - _setMyLastOptime_inlock(&lock, OpTime(), true); + if (_getMemberState_inlock().primary()) { + return; } - void ReplicationCoordinatorImpl::_setMyLastOptime_inlock( - stdx::unique_lock<stdx::mutex>* lock, const OpTime& opTime, bool isRollbackAllowed) { - invariant(lock->owns_lock()); - SlaveInfo* mySlaveInfo = &_slaveInfo[_getMyIndexInSlaveInfo_inlock()]; - invariant(isRollbackAllowed || mySlaveInfo->opTime <= opTime); - _updateSlaveInfoOptime_inlock(mySlaveInfo, opTime); - - if (getReplicationMode() != modeReplSet) { - return; - } + lock->unlock(); - for (auto& opTimeWaiter : _opTimeWaiterList) { - if (*(opTimeWaiter->opTime) <= opTime) { - opTimeWaiter->condVar->notify_all(); - } - } + _externalState->forwardSlaveProgress(); // Must do this outside _mutex +} - if (_getMemberState_inlock().primary()) { - return; - } +OpTime ReplicationCoordinatorImpl::getMyLastOptime() const { + stdx::lock_guard<stdx::mutex> lock(_mutex); + return _getMyLastOptime_inlock(); +} - lock->unlock(); +ReadAfterOpTimeResponse ReplicationCoordinatorImpl::waitUntilOpTime( + OperationContext* txn, const ReadAfterOpTimeArgs& settings) { + const auto& ts = settings.getOpTime(); - _externalState->forwardSlaveProgress(); // Must do this outside _mutex + if (ts.isNull()) { + return ReadAfterOpTimeResponse(); } - OpTime ReplicationCoordinatorImpl::getMyLastOptime() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); - return _getMyLastOptime_inlock(); + if (getReplicationMode() != repl::ReplicationCoordinator::modeReplSet) { + return ReadAfterOpTimeResponse( + Status(ErrorCodes::NotAReplicaSet, + "node needs to be a replica set member to use read after opTime")); } - ReadAfterOpTimeResponse ReplicationCoordinatorImpl::waitUntilOpTime( - OperationContext* txn, - const ReadAfterOpTimeArgs& settings) { - const auto& ts = settings.getOpTime(); - - if (ts.isNull()) { - return ReadAfterOpTimeResponse(); - } - - if (getReplicationMode() != repl::ReplicationCoordinator::modeReplSet) { - return ReadAfterOpTimeResponse(Status(ErrorCodes::NotAReplicaSet, - "node needs to be a replica set member to use read after opTime")); - } - - // TODO: SERVER-18298 enable code once V1 protocol is fully implemented. +// TODO: SERVER-18298 enable code once V1 protocol is fully implemented. #if 0 if (!isV1ElectionProtocol()) { return ReadAfterOpTimeResponse(Status(ErrorCodes::IncompatibleElectionProtocol, @@ -795,1013 +762,976 @@ namespace { } #endif - Timer timer; - stdx::unique_lock<stdx::mutex> lock(_mutex); - - while (ts > _getMyLastOptime_inlock()) { - Status interruptedStatus = txn->checkForInterruptNoAssert(); - if (!interruptedStatus.isOK()) { - return ReadAfterOpTimeResponse(interruptedStatus, Milliseconds(timer.millis())); - } - - if (_inShutdown) { - return ReadAfterOpTimeResponse( - Status(ErrorCodes::ShutdownInProgress, "shutting down"), - Milliseconds(timer.millis())); - } - - stdx::condition_variable condVar; - WaiterInfo waitInfo(&_opTimeWaiterList, - txn->getOpID(), - &ts, - nullptr, // Don't care about write concern. - &condVar); + Timer timer; + stdx::unique_lock<stdx::mutex> lock(_mutex); - if (CurOp::get(txn)->isMaxTimeSet()) { - condVar.wait_for(lock, Microseconds(txn->getRemainingMaxTimeMicros())); - } - else { - condVar.wait(lock); - } + while (ts > _getMyLastOptime_inlock()) { + Status interruptedStatus = txn->checkForInterruptNoAssert(); + if (!interruptedStatus.isOK()) { + return ReadAfterOpTimeResponse(interruptedStatus, Milliseconds(timer.millis())); } - return ReadAfterOpTimeResponse(Status::OK(), Milliseconds(timer.millis())); - } + if (_inShutdown) { + return ReadAfterOpTimeResponse(Status(ErrorCodes::ShutdownInProgress, "shutting down"), + Milliseconds(timer.millis())); + } - OpTime ReplicationCoordinatorImpl::_getMyLastOptime_inlock() const { - return _slaveInfo[_getMyIndexInSlaveInfo_inlock()].opTime; + stdx::condition_variable condVar; + WaiterInfo waitInfo(&_opTimeWaiterList, + txn->getOpID(), + &ts, + nullptr, // Don't care about write concern. + &condVar); + + if (CurOp::get(txn)->isMaxTimeSet()) { + condVar.wait_for(lock, Microseconds(txn->getRemainingMaxTimeMicros())); + } else { + condVar.wait(lock); + } } - Status ReplicationCoordinatorImpl::setLastOptime_forTest(long long cfgVer, - long long memberId, - const OpTime& opTime) { - stdx::lock_guard<stdx::mutex> lock(_mutex); - invariant(getReplicationMode() == modeReplSet); + return ReadAfterOpTimeResponse(Status::OK(), Milliseconds(timer.millis())); +} - const UpdatePositionArgs::UpdateInfo update(OID(), opTime, cfgVer, memberId); - long long configVersion; - return _setLastOptime_inlock(update, &configVersion); - } +OpTime ReplicationCoordinatorImpl::_getMyLastOptime_inlock() const { + return _slaveInfo[_getMyIndexInSlaveInfo_inlock()].opTime; +} - Status ReplicationCoordinatorImpl::_setLastOptime_inlock( - const UpdatePositionArgs::UpdateInfo& args, long long* configVersion) { +Status ReplicationCoordinatorImpl::setLastOptime_forTest(long long cfgVer, + long long memberId, + const OpTime& opTime) { + stdx::lock_guard<stdx::mutex> lock(_mutex); + invariant(getReplicationMode() == modeReplSet); - if (_selfIndex == -1) { - // Ignore updates when we're in state REMOVED - return Status(ErrorCodes::NotMasterOrSecondaryCode, - "Received replSetUpdatePosition command but we are in state REMOVED"); - } - invariant(getReplicationMode() == modeReplSet); - - if (args.memberId < 0) { - std::string errmsg = str::stream() - << "Received replSetUpdatePosition for node with memberId " - << args.memberId << " which is negative and therefore invalid"; - LOG(1) << errmsg; - return Status(ErrorCodes::NodeNotFound, errmsg); - } + const UpdatePositionArgs::UpdateInfo update(OID(), opTime, cfgVer, memberId); + long long configVersion; + return _setLastOptime_inlock(update, &configVersion); +} - if (args.rid == _getMyRID_inlock() || - args.memberId == _rsConfig.getMemberAt(_selfIndex).getId()) { - // Do not let remote nodes tell us what our optime is. - return Status::OK(); - } +Status ReplicationCoordinatorImpl::_setLastOptime_inlock(const UpdatePositionArgs::UpdateInfo& args, + long long* configVersion) { + if (_selfIndex == -1) { + // Ignore updates when we're in state REMOVED + return Status(ErrorCodes::NotMasterOrSecondaryCode, + "Received replSetUpdatePosition command but we are in state REMOVED"); + } + invariant(getReplicationMode() == modeReplSet); - LOG(2) << "received notification that node with memberID " << args.memberId << - " in config with version " << args.cfgver << " has reached optime: " << args.ts; - - SlaveInfo* slaveInfo = NULL; - if (args.cfgver != _rsConfig.getConfigVersion()) { - std::string errmsg = str::stream() - << "Received replSetUpdatePosition for node with memberId " - << args.memberId << " whose config version of " << args.cfgver - << " doesn't match our config version of " - << _rsConfig.getConfigVersion(); - LOG(1) << errmsg; - *configVersion = _rsConfig.getConfigVersion(); - return Status(ErrorCodes::InvalidReplicaSetConfig, errmsg); - } + if (args.memberId < 0) { + std::string errmsg = str::stream() + << "Received replSetUpdatePosition for node with memberId " << args.memberId + << " which is negative and therefore invalid"; + LOG(1) << errmsg; + return Status(ErrorCodes::NodeNotFound, errmsg); + } - slaveInfo = _findSlaveInfoByMemberID_inlock(args.memberId); - if (!slaveInfo) { - invariant(!_rsConfig.findMemberByID(args.memberId)); + if (args.rid == _getMyRID_inlock() || + args.memberId == _rsConfig.getMemberAt(_selfIndex).getId()) { + // Do not let remote nodes tell us what our optime is. + return Status::OK(); + } - std::string errmsg = str::stream() - << "Received replSetUpdatePosition for node with memberId " - << args.memberId << " which doesn't exist in our config"; - LOG(1) << errmsg; - return Status(ErrorCodes::NodeNotFound, errmsg); - } + LOG(2) << "received notification that node with memberID " << args.memberId + << " in config with version " << args.cfgver << " has reached optime: " << args.ts; - invariant(args.memberId == slaveInfo->memberId); + SlaveInfo* slaveInfo = NULL; + if (args.cfgver != _rsConfig.getConfigVersion()) { + std::string errmsg = str::stream() + << "Received replSetUpdatePosition for node with memberId " << args.memberId + << " whose config version of " << args.cfgver << " doesn't match our config version of " + << _rsConfig.getConfigVersion(); + LOG(1) << errmsg; + *configVersion = _rsConfig.getConfigVersion(); + return Status(ErrorCodes::InvalidReplicaSetConfig, errmsg); + } - LOG(3) << "Node with memberID " << args.memberId << " currently has optime " << - slaveInfo->opTime << "; updating to " << args.ts; + slaveInfo = _findSlaveInfoByMemberID_inlock(args.memberId); + if (!slaveInfo) { + invariant(!_rsConfig.findMemberByID(args.memberId)); - // Only update remote optimes if they increase. - if (slaveInfo->opTime < args.ts) { - _updateSlaveInfoOptime_inlock(slaveInfo, args.ts); - } - _updateLastCommittedOpTime_inlock(); - return Status::OK(); + std::string errmsg = str::stream() + << "Received replSetUpdatePosition for node with memberId " << args.memberId + << " which doesn't exist in our config"; + LOG(1) << errmsg; + return Status(ErrorCodes::NodeNotFound, errmsg); } - void ReplicationCoordinatorImpl::interrupt(unsigned opId) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - for (std::vector<WaiterInfo*>::iterator it = _replicationWaiterList.begin(); - it != _replicationWaiterList.end(); ++it) { - WaiterInfo* info = *it; - if (info->opID == opId) { - info->condVar->notify_all(); - return; - } - } + invariant(args.memberId == slaveInfo->memberId); - for (auto& opTimeWaiter : _opTimeWaiterList) { - if (opTimeWaiter->opID == opId) { - opTimeWaiter->condVar->notify_all(); - return; - } - } + LOG(3) << "Node with memberID " << args.memberId << " currently has optime " + << slaveInfo->opTime << "; updating to " << args.ts; - _replExecutor.scheduleWork( - stdx::bind(&ReplicationCoordinatorImpl::_signalStepDownWaitersFromCallback, - this, - stdx::placeholders::_1)); + // Only update remote optimes if they increase. + if (slaveInfo->opTime < args.ts) { + _updateSlaveInfoOptime_inlock(slaveInfo, args.ts); } + _updateLastCommittedOpTime_inlock(); + return Status::OK(); +} - void ReplicationCoordinatorImpl::interruptAll() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - for (std::vector<WaiterInfo*>::iterator it = _replicationWaiterList.begin(); - it != _replicationWaiterList.end(); ++it) { - WaiterInfo* info = *it; +void ReplicationCoordinatorImpl::interrupt(unsigned opId) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + for (std::vector<WaiterInfo*>::iterator it = _replicationWaiterList.begin(); + it != _replicationWaiterList.end(); + ++it) { + WaiterInfo* info = *it; + if (info->opID == opId) { info->condVar->notify_all(); + return; } + } - for (auto& opTimeWaiter : _opTimeWaiterList) { + for (auto& opTimeWaiter : _opTimeWaiterList) { + if (opTimeWaiter->opID == opId) { opTimeWaiter->condVar->notify_all(); + return; } - - _replExecutor.scheduleWork( - stdx::bind(&ReplicationCoordinatorImpl::_signalStepDownWaitersFromCallback, - this, - stdx::placeholders::_1)); } - bool ReplicationCoordinatorImpl::_doneWaitingForReplication_inlock( - const OpTime& opTime, const WriteConcernOptions& writeConcern) { - Status status = _checkIfWriteConcernCanBeSatisfied_inlock(writeConcern); - if (!status.isOK()) { - return true; - } + _replExecutor.scheduleWork( + stdx::bind(&ReplicationCoordinatorImpl::_signalStepDownWaitersFromCallback, + this, + stdx::placeholders::_1)); +} - if (!writeConcern.wMode.empty()) { - StringData patternName; - if (writeConcern.wMode == WriteConcernOptions::kMajority) { - patternName = ReplicaSetConfig::kMajorityWriteConcernModeName; - } - else { - patternName = writeConcern.wMode; - } - StatusWith<ReplicaSetTagPattern> tagPattern = - _rsConfig.findCustomWriteMode(patternName); - if (!tagPattern.isOK()) { - return true; - } - return _haveTaggedNodesReachedOpTime_inlock(opTime, tagPattern.getValue()); - } - else { - return _haveNumNodesReachedOpTime_inlock(opTime, writeConcern.wNumNodes); - } +void ReplicationCoordinatorImpl::interruptAll() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + for (std::vector<WaiterInfo*>::iterator it = _replicationWaiterList.begin(); + it != _replicationWaiterList.end(); + ++it) { + WaiterInfo* info = *it; + info->condVar->notify_all(); } - bool ReplicationCoordinatorImpl::_haveNumNodesReachedOpTime_inlock(const OpTime& opTime, - int numNodes) { - if (_getMyLastOptime_inlock() < opTime) { - // Secondaries that are for some reason ahead of us should not allow us to - // satisfy a write concern if we aren't caught up ourselves. - return false; - } + for (auto& opTimeWaiter : _opTimeWaiterList) { + opTimeWaiter->condVar->notify_all(); + } - for (SlaveInfoVector::iterator it = _slaveInfo.begin(); - it != _slaveInfo.end(); ++it) { + _replExecutor.scheduleWork( + stdx::bind(&ReplicationCoordinatorImpl::_signalStepDownWaitersFromCallback, + this, + stdx::placeholders::_1)); +} - const OpTime& slaveTime = it->opTime; - if (slaveTime >= opTime) { - --numNodes; - } +bool ReplicationCoordinatorImpl::_doneWaitingForReplication_inlock( + const OpTime& opTime, const WriteConcernOptions& writeConcern) { + Status status = _checkIfWriteConcernCanBeSatisfied_inlock(writeConcern); + if (!status.isOK()) { + return true; + } - if (numNodes <= 0) { - return true; - } + if (!writeConcern.wMode.empty()) { + StringData patternName; + if (writeConcern.wMode == WriteConcernOptions::kMajority) { + patternName = ReplicaSetConfig::kMajorityWriteConcernModeName; + } else { + patternName = writeConcern.wMode; } - return false; + StatusWith<ReplicaSetTagPattern> tagPattern = _rsConfig.findCustomWriteMode(patternName); + if (!tagPattern.isOK()) { + return true; + } + return _haveTaggedNodesReachedOpTime_inlock(opTime, tagPattern.getValue()); + } else { + return _haveNumNodesReachedOpTime_inlock(opTime, writeConcern.wNumNodes); } +} - bool ReplicationCoordinatorImpl::_haveTaggedNodesReachedOpTime_inlock( - const OpTime& opTime, const ReplicaSetTagPattern& tagPattern) { - - ReplicaSetTagMatch matcher(tagPattern); - for (SlaveInfoVector::iterator it = _slaveInfo.begin(); - it != _slaveInfo.end(); ++it) { - - const OpTime& slaveTime = it->opTime; - if (slaveTime >= opTime) { - // This node has reached the desired optime, now we need to check if it is a part - // of the tagPattern. - const MemberConfig* memberConfig = _rsConfig.findMemberByID(it->memberId); - invariant(memberConfig); - for (MemberConfig::TagIterator it = memberConfig->tagsBegin(); - it != memberConfig->tagsEnd(); ++it) { - if (matcher.update(*it)) { - return true; - } - } - } - } +bool ReplicationCoordinatorImpl::_haveNumNodesReachedOpTime_inlock(const OpTime& opTime, + int numNodes) { + if (_getMyLastOptime_inlock() < opTime) { + // Secondaries that are for some reason ahead of us should not allow us to + // satisfy a write concern if we aren't caught up ourselves. return false; } - ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::awaitReplication( - OperationContext* txn, - const OpTime& opTime, - const WriteConcernOptions& writeConcern) { - Timer timer; - stdx::unique_lock<stdx::mutex> lock(_mutex); - return _awaitReplication_inlock(&timer, &lock, txn, opTime, writeConcern); - } - - ReplicationCoordinator::StatusAndDuration - ReplicationCoordinatorImpl::awaitReplicationOfLastOpForClient( - OperationContext* txn, - const WriteConcernOptions& writeConcern) { - Timer timer; - stdx::unique_lock<stdx::mutex> lock(_mutex); - return _awaitReplication_inlock( - &timer, - &lock, - txn, - ReplClientInfo::forClient(txn->getClient()).getLastOp(), - writeConcern); - } - - ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::_awaitReplication_inlock( - const Timer* timer, - stdx::unique_lock<stdx::mutex>* lock, - OperationContext* txn, - const OpTime& opTime, - const WriteConcernOptions& writeConcern) { - - const Mode replMode = getReplicationMode(); - if (replMode == modeNone || serverGlobalParams.configsvr) { - // no replication check needed (validated above) - return StatusAndDuration(Status::OK(), Milliseconds(timer->millis())); - } - - if (replMode == modeMasterSlave && writeConcern.wMode == WriteConcernOptions::kMajority) { - // with master/slave, majority is equivalent to w=1 - return StatusAndDuration(Status::OK(), Milliseconds(timer->millis())); - } - - if (opTime.isNull()) { - // If waiting for the empty optime, always say it's been replicated. - return StatusAndDuration(Status::OK(), Milliseconds(timer->millis())); - } - - if (replMode == modeReplSet && !_memberState.primary()) { - return StatusAndDuration(Status(ErrorCodes::NotMaster, - "Not master while waiting for replication"), - Milliseconds(timer->millis())); + for (SlaveInfoVector::iterator it = _slaveInfo.begin(); it != _slaveInfo.end(); ++it) { + const OpTime& slaveTime = it->opTime; + if (slaveTime >= opTime) { + --numNodes; } - if (writeConcern.wMode.empty()) { - if (writeConcern.wNumNodes < 1) { - return StatusAndDuration(Status::OK(), Milliseconds(timer->millis())); - } - else if (writeConcern.wNumNodes == 1 && _getMyLastOptime_inlock() >= opTime) { - return StatusAndDuration(Status::OK(), Milliseconds(timer->millis())); - } + if (numNodes <= 0) { + return true; } + } + return false; +} - // Must hold _mutex before constructing waitInfo as it will modify _replicationWaiterList - stdx::condition_variable condVar; - WaiterInfo waitInfo( - &_replicationWaiterList, txn->getOpID(), &opTime, &writeConcern, &condVar); - while (!_doneWaitingForReplication_inlock(opTime, writeConcern)) { - const Milliseconds elapsed{timer->millis()}; - - Status interruptedStatus = txn->checkForInterruptNoAssert(); - if (!interruptedStatus.isOK()) { - return StatusAndDuration(interruptedStatus, elapsed); - } - - if (!waitInfo.master) { - return StatusAndDuration(Status(ErrorCodes::NotMaster, - "Not master anymore while waiting for replication" - " - this most likely means that a step down" - " occurred while waiting for replication"), - elapsed); - } - - if (writeConcern.wTimeout != WriteConcernOptions::kNoTimeout && - elapsed > Milliseconds{writeConcern.wTimeout}) { - return StatusAndDuration(Status(ErrorCodes::WriteConcernFailed, - "waiting for replication timed out"), - elapsed); - } - - if (_inShutdown) { - return StatusAndDuration(Status(ErrorCodes::ShutdownInProgress, - "Replication is being shut down"), - elapsed); - } - - const Microseconds maxTimeMicrosRemaining{txn->getRemainingMaxTimeMicros()}; - Microseconds waitTime = Microseconds::max(); - if (maxTimeMicrosRemaining != Microseconds::zero()) { - waitTime = maxTimeMicrosRemaining; - } - if (writeConcern.wTimeout != WriteConcernOptions::kNoTimeout) { - waitTime = std::min<Microseconds>(Milliseconds{writeConcern.wTimeout} - elapsed, - waitTime); - } - - if (waitTime == Microseconds::max()) { - condVar.wait(*lock); - } - else { - condVar.wait_for(*lock, waitTime); +bool ReplicationCoordinatorImpl::_haveTaggedNodesReachedOpTime_inlock( + const OpTime& opTime, const ReplicaSetTagPattern& tagPattern) { + ReplicaSetTagMatch matcher(tagPattern); + for (SlaveInfoVector::iterator it = _slaveInfo.begin(); it != _slaveInfo.end(); ++it) { + const OpTime& slaveTime = it->opTime; + if (slaveTime >= opTime) { + // This node has reached the desired optime, now we need to check if it is a part + // of the tagPattern. + const MemberConfig* memberConfig = _rsConfig.findMemberByID(it->memberId); + invariant(memberConfig); + for (MemberConfig::TagIterator it = memberConfig->tagsBegin(); + it != memberConfig->tagsEnd(); + ++it) { + if (matcher.update(*it)) { + return true; + } } } - - Status status = _checkIfWriteConcernCanBeSatisfied_inlock(writeConcern); - if (!status.isOK()) { - return StatusAndDuration(status, Milliseconds(timer->millis())); - } - + } + return false; +} + +ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::awaitReplication( + OperationContext* txn, const OpTime& opTime, const WriteConcernOptions& writeConcern) { + Timer timer; + stdx::unique_lock<stdx::mutex> lock(_mutex); + return _awaitReplication_inlock(&timer, &lock, txn, opTime, writeConcern); +} + +ReplicationCoordinator::StatusAndDuration +ReplicationCoordinatorImpl::awaitReplicationOfLastOpForClient( + OperationContext* txn, const WriteConcernOptions& writeConcern) { + Timer timer; + stdx::unique_lock<stdx::mutex> lock(_mutex); + return _awaitReplication_inlock( + &timer, &lock, txn, ReplClientInfo::forClient(txn->getClient()).getLastOp(), writeConcern); +} + +ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::_awaitReplication_inlock( + const Timer* timer, + stdx::unique_lock<stdx::mutex>* lock, + OperationContext* txn, + const OpTime& opTime, + const WriteConcernOptions& writeConcern) { + const Mode replMode = getReplicationMode(); + if (replMode == modeNone || serverGlobalParams.configsvr) { + // no replication check needed (validated above) return StatusAndDuration(Status::OK(), Milliseconds(timer->millis())); } - Status ReplicationCoordinatorImpl::stepDown(OperationContext* txn, - bool force, - const Milliseconds& waitTime, - const Milliseconds& stepdownTime) { - const Date_t startTime = _replExecutor.now(); - const Date_t stepDownUntil = startTime + stepdownTime; - const Date_t waitUntil = startTime + waitTime; - - if (!getMemberState().primary()) { - // Note this check is inherently racy - it's always possible for the node to - // stepdown from some other path before we acquire the global shared lock, but - // that's okay because we are resiliant to that happening in _stepDownContinue. - return Status(ErrorCodes::NotMaster, "not primary so can't step down"); - } - - LockResult lockState = txn->lockState()->lockGlobalBegin(MODE_S); - // We've requested the global shared lock which will stop new writes from coming in, - // but existing writes could take a long time to finish, so kill all user operations - // to help us get the global lock faster. - _externalState->killAllUserOperations(txn); - - if (lockState == LOCK_WAITING) { - lockState = txn->lockState()->lockGlobalComplete( - durationCount<Milliseconds>(stepdownTime)); - if (lockState == LOCK_TIMEOUT) { - return Status(ErrorCodes::ExceededTimeLimit, - "Could not acquire the global shared lock within the amount of time " - "specified that we should step down for"); - } - } - invariant(lockState == LOCK_OK); - ON_BLOCK_EXIT(&Locker::unlockAll, txn->lockState()); - // From this point onward we are guaranteed to be holding the global shared lock. + if (replMode == modeMasterSlave && writeConcern.wMode == WriteConcernOptions::kMajority) { + // with master/slave, majority is equivalent to w=1 + return StatusAndDuration(Status::OK(), Milliseconds(timer->millis())); + } - StatusWith<ReplicationExecutor::EventHandle> finishedEvent = _replExecutor.makeEvent(); - if (finishedEvent.getStatus() == ErrorCodes::ShutdownInProgress) { - return finishedEvent.getStatus(); - } - fassert(26000, finishedEvent.getStatus()); - Status result(ErrorCodes::InternalError, "didn't set status in _stepDownContinue"); - CBHStatus cbh = _replExecutor.scheduleWork( - stdx::bind(&ReplicationCoordinatorImpl::_stepDownContinue, - this, - stdx::placeholders::_1, - finishedEvent.getValue(), - txn, - waitUntil, - stepDownUntil, - force, - &result)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return cbh.getStatus(); - } - fassert(18809, cbh.getStatus()); - cbh = _replExecutor.scheduleWorkAt( - waitUntil, - stdx::bind(&ReplicationCoordinatorImpl::_signalStepDownWaitersFromCallback, - this, - stdx::placeholders::_1)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return cbh.getStatus(); - } - fassert(26001, cbh.getStatus()); - _replExecutor.waitForEvent(finishedEvent.getValue()); - return result; + if (opTime.isNull()) { + // If waiting for the empty optime, always say it's been replicated. + return StatusAndDuration(Status::OK(), Milliseconds(timer->millis())); } - void ReplicationCoordinatorImpl::_signalStepDownWaitersFromCallback( - const ReplicationExecutor::CallbackArgs& cbData) { - if (!cbData.status.isOK()) { - return; - } + if (replMode == modeReplSet && !_memberState.primary()) { + return StatusAndDuration( + Status(ErrorCodes::NotMaster, "Not master while waiting for replication"), + Milliseconds(timer->millis())); + } - _signalStepDownWaiters(); - } - - void ReplicationCoordinatorImpl::_signalStepDownWaiters() { - std::for_each(_stepDownWaiters.begin(), - _stepDownWaiters.end(), - stdx::bind(&ReplicationExecutor::signalEvent, - &_replExecutor, - stdx::placeholders::_1)); - _stepDownWaiters.clear(); - } - - void ReplicationCoordinatorImpl::_stepDownContinue( - const ReplicationExecutor::CallbackArgs& cbData, - const ReplicationExecutor::EventHandle finishedEvent, - OperationContext* txn, - const Date_t waitUntil, - const Date_t stepDownUntil, - bool force, - Status* result) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - // Cancelation only occurs on shutdown, which will also handle signaling the event. - *result = Status(ErrorCodes::ShutdownInProgress, "Shutting down replication"); - return; + if (writeConcern.wMode.empty()) { + if (writeConcern.wNumNodes < 1) { + return StatusAndDuration(Status::OK(), Milliseconds(timer->millis())); + } else if (writeConcern.wNumNodes == 1 && _getMyLastOptime_inlock() >= opTime) { + return StatusAndDuration(Status::OK(), Milliseconds(timer->millis())); } + } - ScopeGuard allFinishedGuard = MakeGuard( - stdx::bind(&ReplicationExecutor::signalEvent, &_replExecutor, finishedEvent)); - if (!cbData.status.isOK()) { - *result = cbData.status; - return; - } + // Must hold _mutex before constructing waitInfo as it will modify _replicationWaiterList + stdx::condition_variable condVar; + WaiterInfo waitInfo(&_replicationWaiterList, txn->getOpID(), &opTime, &writeConcern, &condVar); + while (!_doneWaitingForReplication_inlock(opTime, writeConcern)) { + const Milliseconds elapsed{timer->millis()}; Status interruptedStatus = txn->checkForInterruptNoAssert(); if (!interruptedStatus.isOK()) { - *result = interruptedStatus; - return; + return StatusAndDuration(interruptedStatus, elapsed); } - if (_topCoord->getRole() != TopologyCoordinator::Role::leader) { - *result = Status(ErrorCodes::NotMaster, - "Already stepped down from primary while processing step down " - "request"); - return; - } - const Date_t now = _replExecutor.now(); - if (now >= stepDownUntil) { - *result = Status(ErrorCodes::ExceededTimeLimit, - "By the time we were ready to step down, we were already past the " - "time we were supposed to step down until"); - return; + if (!waitInfo.master) { + return StatusAndDuration(Status(ErrorCodes::NotMaster, + "Not master anymore while waiting for replication" + " - this most likely means that a step down" + " occurred while waiting for replication"), + elapsed); } - bool forceNow = now >= waitUntil ? force : false; - if (_topCoord->stepDown(stepDownUntil, forceNow, getMyLastOptime())) { - // Schedule work to (potentially) step back up once the stepdown period has ended. - _replExecutor.scheduleWorkAt(stepDownUntil, - stdx::bind(&ReplicationCoordinatorImpl::_handleTimePassing, - this, - stdx::placeholders::_1)); - stdx::unique_lock<stdx::mutex> lk(_mutex); - const PostMemberStateUpdateAction action = - _updateMemberStateFromTopologyCoordinator_inlock(); - lk.unlock(); - _performPostMemberStateUpdateAction(action); - *result = Status::OK(); - return; + if (writeConcern.wTimeout != WriteConcernOptions::kNoTimeout && + elapsed > Milliseconds{writeConcern.wTimeout}) { + return StatusAndDuration( + Status(ErrorCodes::WriteConcernFailed, "waiting for replication timed out"), + elapsed); } - // Step down failed. Keep waiting if we can, otherwise finish. - if (now >= waitUntil) { - *result = Status(ErrorCodes::ExceededTimeLimit, str::stream() << - "No electable secondaries caught up as of " << - dateToISOStringLocal(now) << - ". Please use {force: true} to force node to step down."); - return; + if (_inShutdown) { + return StatusAndDuration( + Status(ErrorCodes::ShutdownInProgress, "Replication is being shut down"), elapsed); } - if (_stepDownWaiters.empty()) { - StatusWith<ReplicationExecutor::EventHandle> reschedEvent = - _replExecutor.makeEvent(); - if (!reschedEvent.isOK()) { - *result = reschedEvent.getStatus(); - return; - } - _stepDownWaiters.push_back(reschedEvent.getValue()); + const Microseconds maxTimeMicrosRemaining{txn->getRemainingMaxTimeMicros()}; + Microseconds waitTime = Microseconds::max(); + if (maxTimeMicrosRemaining != Microseconds::zero()) { + waitTime = maxTimeMicrosRemaining; } - CBHStatus cbh = _replExecutor.onEvent( - _stepDownWaiters.back(), - stdx::bind(&ReplicationCoordinatorImpl::_stepDownContinue, - this, - stdx::placeholders::_1, - finishedEvent, - txn, - waitUntil, - stepDownUntil, - force, - result)); - if (!cbh.isOK()) { - *result = cbh.getStatus(); - return; + if (writeConcern.wTimeout != WriteConcernOptions::kNoTimeout) { + waitTime = + std::min<Microseconds>(Milliseconds{writeConcern.wTimeout} - elapsed, waitTime); } - allFinishedGuard.Dismiss(); - } - void ReplicationCoordinatorImpl::_handleTimePassing( - const ReplicationExecutor::CallbackArgs& cbData) { - if (!cbData.status.isOK()) { - return; - } + if (waitTime == Microseconds::max()) { + condVar.wait(*lock); + } else { + condVar.wait_for(*lock, waitTime); + } + } + + Status status = _checkIfWriteConcernCanBeSatisfied_inlock(writeConcern); + if (!status.isOK()) { + return StatusAndDuration(status, Milliseconds(timer->millis())); + } + + return StatusAndDuration(Status::OK(), Milliseconds(timer->millis())); +} + +Status ReplicationCoordinatorImpl::stepDown(OperationContext* txn, + bool force, + const Milliseconds& waitTime, + const Milliseconds& stepdownTime) { + const Date_t startTime = _replExecutor.now(); + const Date_t stepDownUntil = startTime + stepdownTime; + const Date_t waitUntil = startTime + waitTime; + + if (!getMemberState().primary()) { + // Note this check is inherently racy - it's always possible for the node to + // stepdown from some other path before we acquire the global shared lock, but + // that's okay because we are resiliant to that happening in _stepDownContinue. + return Status(ErrorCodes::NotMaster, "not primary so can't step down"); + } + + LockResult lockState = txn->lockState()->lockGlobalBegin(MODE_S); + // We've requested the global shared lock which will stop new writes from coming in, + // but existing writes could take a long time to finish, so kill all user operations + // to help us get the global lock faster. + _externalState->killAllUserOperations(txn); + + if (lockState == LOCK_WAITING) { + lockState = txn->lockState()->lockGlobalComplete(durationCount<Milliseconds>(stepdownTime)); + if (lockState == LOCK_TIMEOUT) { + return Status(ErrorCodes::ExceededTimeLimit, + "Could not acquire the global shared lock within the amount of time " + "specified that we should step down for"); + } + } + invariant(lockState == LOCK_OK); + ON_BLOCK_EXIT(&Locker::unlockAll, txn->lockState()); + // From this point onward we are guaranteed to be holding the global shared lock. + + StatusWith<ReplicationExecutor::EventHandle> finishedEvent = _replExecutor.makeEvent(); + if (finishedEvent.getStatus() == ErrorCodes::ShutdownInProgress) { + return finishedEvent.getStatus(); + } + fassert(26000, finishedEvent.getStatus()); + Status result(ErrorCodes::InternalError, "didn't set status in _stepDownContinue"); + CBHStatus cbh = + _replExecutor.scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_stepDownContinue, + this, + stdx::placeholders::_1, + finishedEvent.getValue(), + txn, + waitUntil, + stepDownUntil, + force, + &result)); + if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { + return cbh.getStatus(); + } + fassert(18809, cbh.getStatus()); + cbh = _replExecutor.scheduleWorkAt( + waitUntil, + stdx::bind(&ReplicationCoordinatorImpl::_signalStepDownWaitersFromCallback, + this, + stdx::placeholders::_1)); + if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { + return cbh.getStatus(); + } + fassert(26001, cbh.getStatus()); + _replExecutor.waitForEvent(finishedEvent.getValue()); + return result; +} + +void ReplicationCoordinatorImpl::_signalStepDownWaitersFromCallback( + const ReplicationExecutor::CallbackArgs& cbData) { + if (!cbData.status.isOK()) { + return; + } + + _signalStepDownWaiters(); +} + +void ReplicationCoordinatorImpl::_signalStepDownWaiters() { + std::for_each( + _stepDownWaiters.begin(), + _stepDownWaiters.end(), + stdx::bind(&ReplicationExecutor::signalEvent, &_replExecutor, stdx::placeholders::_1)); + _stepDownWaiters.clear(); +} + +void ReplicationCoordinatorImpl::_stepDownContinue( + const ReplicationExecutor::CallbackArgs& cbData, + const ReplicationExecutor::EventHandle finishedEvent, + OperationContext* txn, + const Date_t waitUntil, + const Date_t stepDownUntil, + bool force, + Status* result) { + if (cbData.status == ErrorCodes::CallbackCanceled) { + // Cancelation only occurs on shutdown, which will also handle signaling the event. + *result = Status(ErrorCodes::ShutdownInProgress, "Shutting down replication"); + return; + } + + ScopeGuard allFinishedGuard = + MakeGuard(stdx::bind(&ReplicationExecutor::signalEvent, &_replExecutor, finishedEvent)); + if (!cbData.status.isOK()) { + *result = cbData.status; + return; + } + + Status interruptedStatus = txn->checkForInterruptNoAssert(); + if (!interruptedStatus.isOK()) { + *result = interruptedStatus; + return; + } + + if (_topCoord->getRole() != TopologyCoordinator::Role::leader) { + *result = Status(ErrorCodes::NotMaster, + "Already stepped down from primary while processing step down " + "request"); + return; + } + const Date_t now = _replExecutor.now(); + if (now >= stepDownUntil) { + *result = Status(ErrorCodes::ExceededTimeLimit, + "By the time we were ready to step down, we were already past the " + "time we were supposed to step down until"); + return; + } + bool forceNow = now >= waitUntil ? force : false; + if (_topCoord->stepDown(stepDownUntil, forceNow, getMyLastOptime())) { + // Schedule work to (potentially) step back up once the stepdown period has ended. + _replExecutor.scheduleWorkAt(stepDownUntil, + stdx::bind(&ReplicationCoordinatorImpl::_handleTimePassing, + this, + stdx::placeholders::_1)); - if (_topCoord->becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(_replExecutor.now())) { - _performPostMemberStateUpdateAction(kActionWinElection); - } + stdx::unique_lock<stdx::mutex> lk(_mutex); + const PostMemberStateUpdateAction action = + _updateMemberStateFromTopologyCoordinator_inlock(); + lk.unlock(); + _performPostMemberStateUpdateAction(action); + *result = Status::OK(); + return; } - bool ReplicationCoordinatorImpl::isMasterForReportingPurposes() { - if (_settings.usingReplSets()) { - stdx::lock_guard<stdx::mutex> lock(_mutex); - if (getReplicationMode() == modeReplSet && _getMemberState_inlock().primary()) { - return true; - } - return false; - } - - if (!_settings.slave) - return true; - - - // TODO(dannenberg) replAllDead is bad and should be removed when master slave is removed - if (replAllDead) { - return false; - } - - if (_settings.master) { - // if running with --master --slave, allow. - return true; - } - - return false; + // Step down failed. Keep waiting if we can, otherwise finish. + if (now >= waitUntil) { + *result = Status(ErrorCodes::ExceededTimeLimit, + str::stream() << "No electable secondaries caught up as of " + << dateToISOStringLocal(now) + << ". Please use {force: true} to force node to step down."); + return; } - bool ReplicationCoordinatorImpl::canAcceptWritesForDatabase(StringData dbName) { - // _canAcceptNonLocalWrites is always true for standalone nodes, always false for nodes - // started with --slave, and adjusted based on primary+drain state in replica sets. - // - // That is, stand-alone nodes, non-slave nodes and drained replica set primaries can always - // accept writes. Similarly, writes are always permitted to the "local" database. Finally, - // in the event that a node is started with --slave and --master, we allow writes unless the - // master/slave system has set the replAllDead flag. - if (_canAcceptNonLocalWrites) { - return true; - } - if (dbName == "local") { - return true; + if (_stepDownWaiters.empty()) { + StatusWith<ReplicationExecutor::EventHandle> reschedEvent = _replExecutor.makeEvent(); + if (!reschedEvent.isOK()) { + *result = reschedEvent.getStatus(); + return; } - return !replAllDead && _settings.master; + _stepDownWaiters.push_back(reschedEvent.getValue()); } - - bool ReplicationCoordinatorImpl::canAcceptWritesFor(const NamespaceString& ns) { - if (_memberState.rollback() && ns.isOplog()) { - return false; - } - StringData dbName = ns.db(); - return canAcceptWritesForDatabase(dbName); + CBHStatus cbh = _replExecutor.onEvent(_stepDownWaiters.back(), + stdx::bind(&ReplicationCoordinatorImpl::_stepDownContinue, + this, + stdx::placeholders::_1, + finishedEvent, + txn, + waitUntil, + stepDownUntil, + force, + result)); + if (!cbh.isOK()) { + *result = cbh.getStatus(); + return; } + allFinishedGuard.Dismiss(); +} - Status ReplicationCoordinatorImpl::checkCanServeReadsFor(OperationContext* txn, - const NamespaceString& ns, - bool slaveOk) { - if (_memberState.rollback() && ns.isOplog()) { - return Status(ErrorCodes::NotMasterOrSecondaryCode, - "cannot read from oplog collection while in rollback"); - } - if (txn->getClient()->isInDirectClient()) { - return Status::OK(); - } - if (canAcceptWritesFor(ns)) { - return Status::OK(); - } - if (_settings.slave || _settings.master) { - return Status::OK(); - } - if (slaveOk) { - if (_canServeNonLocalReads.loadRelaxed()) { - return Status::OK(); - } - return Status( - ErrorCodes::NotMasterOrSecondaryCode, - "not master or secondary; cannot currently read from this replSet member"); - } - return Status(ErrorCodes::NotMasterNoSlaveOkCode, "not master and slaveOk=false"); +void ReplicationCoordinatorImpl::_handleTimePassing( + const ReplicationExecutor::CallbackArgs& cbData) { + if (!cbData.status.isOK()) { + return; } - bool ReplicationCoordinatorImpl::isInPrimaryOrSecondaryState() const { - return _canServeNonLocalReads.loadRelaxed(); + if (_topCoord->becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(_replExecutor.now())) { + _performPostMemberStateUpdateAction(kActionWinElection); } +} - bool ReplicationCoordinatorImpl::shouldIgnoreUniqueIndex(const IndexDescriptor* idx) { - if (!idx->unique()) { - return false; - } - // Never ignore _id index - if (idx->isIdIndex()) { - return false; - } - if (nsToDatabaseSubstring(idx->parentNS()) == "local" ) { - // always enforce on local - return false; - } +bool ReplicationCoordinatorImpl::isMasterForReportingPurposes() { + if (_settings.usingReplSets()) { stdx::lock_guard<stdx::mutex> lock(_mutex); - if (getReplicationMode() != modeReplSet) { - return false; - } - // see SERVER-6671 - MemberState ms = _getMemberState_inlock(); - switch ( ms.s ) { - case MemberState::RS_SECONDARY: - case MemberState::RS_RECOVERING: - case MemberState::RS_ROLLBACK: - case MemberState::RS_STARTUP2: + if (getReplicationMode() == modeReplSet && _getMemberState_inlock().primary()) { return true; - default: - return false; } + return false; } - OID ReplicationCoordinatorImpl::getElectionId() { - stdx::lock_guard<stdx::mutex> lock(_mutex); - return _electionId; - } - - OID ReplicationCoordinatorImpl::getMyRID() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); - return _getMyRID_inlock(); - } - - OID ReplicationCoordinatorImpl::_getMyRID_inlock() const { - return _myRID; - } + if (!_settings.slave) + return true; - int ReplicationCoordinatorImpl::getMyId() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); - return _getMyId_inlock(); - } - int ReplicationCoordinatorImpl::_getMyId_inlock() const { - const MemberConfig& self = _rsConfig.getMemberAt(_selfIndex); - return self.getId(); + // TODO(dannenberg) replAllDead is bad and should be removed when master slave is removed + if (replAllDead) { + return false; } - bool ReplicationCoordinatorImpl::prepareReplSetUpdatePositionCommand( - BSONObjBuilder* cmdBuilder) { - stdx::lock_guard<stdx::mutex> lock(_mutex); - invariant(_rsConfig.isInitialized()); - // do not send updates if we have been removed from the config - if (_selfIndex == -1) { - return false; - } - cmdBuilder->append("replSetUpdatePosition", 1); - // create an array containing objects each member connected to us and for ourself - BSONArrayBuilder arrayBuilder(cmdBuilder->subarrayStart("optimes")); - { - for (SlaveInfoVector::const_iterator itr = _slaveInfo.begin(); - itr != _slaveInfo.end(); ++itr) { - if (itr->opTime.isNull()) { - // Don't include info on members we haven't heard from yet. - continue; - } - BSONObjBuilder entry(arrayBuilder.subobjStart()); - entry.append("_id", itr->rid); - entry.append("optime", itr->opTime.getTimestamp()); - entry.append("memberId", itr->memberId); - entry.append("cfgver", _rsConfig.getConfigVersion()); - // SERVER-14550 Even though the "config" field isn't used on the other end in 3.0, - // we need to keep sending it for 2.6 compatibility. - // TODO(spencer): Remove this after 3.0 is released. - const MemberConfig* member = _rsConfig.findMemberByID(itr->memberId); - fassert(18651, member); - entry.append("config", member->toBSON(_rsConfig.getTagConfig())); - } - } + if (_settings.master) { + // if running with --master --slave, allow. return true; } - Status ReplicationCoordinatorImpl::processReplSetGetStatus(BSONObjBuilder* response) { - Status result(ErrorCodes::InternalError, "didn't set status in prepareStatusResponse"); - CBHStatus cbh = _replExecutor.scheduleWork( - stdx::bind(&TopologyCoordinator::prepareStatusResponse, - _topCoord.get(), - stdx::placeholders::_1, - _replExecutor.now(), - time(0) - serverGlobalParams.started, - getMyLastOptime(), - response, - &result)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress"); - } - fassert(18640, cbh.getStatus()); - _replExecutor.wait(cbh.getValue()); + return false; +} - return result; +bool ReplicationCoordinatorImpl::canAcceptWritesForDatabase(StringData dbName) { + // _canAcceptNonLocalWrites is always true for standalone nodes, always false for nodes + // started with --slave, and adjusted based on primary+drain state in replica sets. + // + // That is, stand-alone nodes, non-slave nodes and drained replica set primaries can always + // accept writes. Similarly, writes are always permitted to the "local" database. Finally, + // in the event that a node is started with --slave and --master, we allow writes unless the + // master/slave system has set the replAllDead flag. + if (_canAcceptNonLocalWrites) { + return true; } - - void ReplicationCoordinatorImpl::fillIsMasterForReplSet(IsMasterResponse* response) { - invariant(getSettings().usingReplSets()); - - CBHStatus cbh = _replExecutor.scheduleWork( - stdx::bind(&ReplicationCoordinatorImpl::_fillIsMasterForReplSet_finish, - this, - stdx::placeholders::_1, - response)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - response->markAsShutdownInProgress(); - return; - } - fassert(28602, cbh.getStatus()); - - _replExecutor.wait(cbh.getValue()); - if (isWaitingForApplierToDrain()) { - // Report that we are secondary to ismaster callers until drain completes. - response->setIsMaster(false); - response->setIsSecondary(true); - } + if (dbName == "local") { + return true; } + return !replAllDead && _settings.master; +} - void ReplicationCoordinatorImpl::_fillIsMasterForReplSet_finish( - const ReplicationExecutor::CallbackArgs& cbData, IsMasterResponse* response) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - response->markAsShutdownInProgress(); - return; - } - _topCoord->fillIsMasterForReplSet(response); +bool ReplicationCoordinatorImpl::canAcceptWritesFor(const NamespaceString& ns) { + if (_memberState.rollback() && ns.isOplog()) { + return false; } + StringData dbName = ns.db(); + return canAcceptWritesForDatabase(dbName); +} - void ReplicationCoordinatorImpl::appendSlaveInfoData(BSONObjBuilder* result) { - stdx::lock_guard<stdx::mutex> lock(_mutex); - BSONArrayBuilder replicationProgress(result->subarrayStart("replicationProgress")); - { - for (SlaveInfoVector::const_iterator itr = _slaveInfo.begin(); - itr != _slaveInfo.end(); ++itr) { - BSONObjBuilder entry(replicationProgress.subobjStart()); - entry.append("rid", itr->rid); - // TODO(siyuan) Output term of OpTime - entry.append("optime", itr->opTime.getTimestamp()); - entry.append("host", itr->hostAndPort.toString()); - if (getReplicationMode() == modeReplSet) { - if (_selfIndex == -1) { - continue; - } - invariant(itr->memberId >= 0); - entry.append("memberId", itr->memberId); - } - } - } +Status ReplicationCoordinatorImpl::checkCanServeReadsFor(OperationContext* txn, + const NamespaceString& ns, + bool slaveOk) { + if (_memberState.rollback() && ns.isOplog()) { + return Status(ErrorCodes::NotMasterOrSecondaryCode, + "cannot read from oplog collection while in rollback"); } - - ReplicaSetConfig ReplicationCoordinatorImpl::getConfig() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); - return _rsConfig; + if (txn->getClient()->isInDirectClient()) { + return Status::OK(); } - - void ReplicationCoordinatorImpl::processReplSetGetConfig(BSONObjBuilder* result) { - stdx::lock_guard<stdx::mutex> lock(_mutex); - result->append("config", _rsConfig.toBSON()); + if (canAcceptWritesFor(ns)) { + return Status::OK(); } - - bool ReplicationCoordinatorImpl::getMaintenanceMode() { - bool maintenanceMode(false); - CBHStatus cbh = _replExecutor.scheduleWork( - stdx::bind(&ReplicationCoordinatorImpl::_getMaintenanceMode_helper, - this, - stdx::placeholders::_1, - &maintenanceMode)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return false; - } - fassert(18811, cbh.getStatus()); - _replExecutor.wait(cbh.getValue()); - return maintenanceMode; + if (_settings.slave || _settings.master) { + return Status::OK(); } - - void ReplicationCoordinatorImpl::_getMaintenanceMode_helper( - const ReplicationExecutor::CallbackArgs& cbData, - bool* maintenanceMode) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - return; + if (slaveOk) { + if (_canServeNonLocalReads.loadRelaxed()) { + return Status::OK(); } - *maintenanceMode = _topCoord->getMaintenanceCount() > 0; + return Status(ErrorCodes::NotMasterOrSecondaryCode, + "not master or secondary; cannot currently read from this replSet member"); } + return Status(ErrorCodes::NotMasterNoSlaveOkCode, "not master and slaveOk=false"); +} - Status ReplicationCoordinatorImpl::setMaintenanceMode(bool activate) { - if (getReplicationMode() != modeReplSet) { - return Status(ErrorCodes::NoReplicationEnabled, - "can only set maintenance mode on replica set members"); - } +bool ReplicationCoordinatorImpl::isInPrimaryOrSecondaryState() const { + return _canServeNonLocalReads.loadRelaxed(); +} - Status result(ErrorCodes::InternalError, "didn't set status in _setMaintenanceMode_helper"); - CBHStatus cbh = _replExecutor.scheduleWork( - stdx::bind(&ReplicationCoordinatorImpl::_setMaintenanceMode_helper, - this, - stdx::placeholders::_1, - activate, - &result)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return cbh.getStatus(); - } - fassert(18698, cbh.getStatus()); - _replExecutor.wait(cbh.getValue()); - return result; +bool ReplicationCoordinatorImpl::shouldIgnoreUniqueIndex(const IndexDescriptor* idx) { + if (!idx->unique()) { + return false; } - - void ReplicationCoordinatorImpl::_setMaintenanceMode_helper( - const ReplicationExecutor::CallbackArgs& cbData, - bool activate, - Status* result) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - *result = Status(ErrorCodes::ShutdownInProgress, "replication system is shutting down"); - return; - } - - stdx::unique_lock<stdx::mutex> lk(_mutex); - if (_getMemberState_inlock().primary()) { - *result = Status(ErrorCodes::NotSecondary, "primaries can't modify maintenance mode"); - return; - } - - int curMaintenanceCalls = _topCoord->getMaintenanceCount(); - if (activate) { - log() << "going into maintenance mode with " << curMaintenanceCalls - << " other maintenance mode tasks in progress" << rsLog; - _topCoord->adjustMaintenanceCountBy(1); - } - else if (curMaintenanceCalls > 0) { - invariant(_topCoord->getRole() == TopologyCoordinator::Role::follower); - - _topCoord->adjustMaintenanceCountBy(-1); - - log() << "leaving maintenance mode (" << curMaintenanceCalls-1 - << " other maintenance mode tasks ongoing)" << rsLog; - } else { - warning() << "Attempted to leave maintenance mode but it is not currently active"; - *result = Status(ErrorCodes::OperationFailed, "already out of maintenance mode"); - return; - } - - const PostMemberStateUpdateAction action = - _updateMemberStateFromTopologyCoordinator_inlock(); - *result = Status::OK(); - lk.unlock(); - _performPostMemberStateUpdateAction(action); + // Never ignore _id index + if (idx->isIdIndex()) { + return false; } - - Status ReplicationCoordinatorImpl::processReplSetSyncFrom(const HostAndPort& target, - BSONObjBuilder* resultObj) { - Status result(ErrorCodes::InternalError, "didn't set status in prepareSyncFromResponse"); - CBHStatus cbh = _replExecutor.scheduleWork( - stdx::bind(&TopologyCoordinator::prepareSyncFromResponse, - _topCoord.get(), - stdx::placeholders::_1, - target, - _getMyLastOptime_inlock(), - resultObj, - &result)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress"); - } - fassert(18649, cbh.getStatus()); - _replExecutor.wait(cbh.getValue()); - return result; - } - - Status ReplicationCoordinatorImpl::processReplSetFreeze(int secs, BSONObjBuilder* resultObj) { - Status result(ErrorCodes::InternalError, "didn't set status in prepareFreezeResponse"); - CBHStatus cbh = _replExecutor.scheduleWork( - stdx::bind(&ReplicationCoordinatorImpl::_processReplSetFreeze_finish, - this, - stdx::placeholders::_1, - secs, - resultObj, - &result)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return cbh.getStatus(); - } - fassert(18641, cbh.getStatus()); - _replExecutor.wait(cbh.getValue()); - return result; - } - - void ReplicationCoordinatorImpl::_processReplSetFreeze_finish( - const ReplicationExecutor::CallbackArgs& cbData, - int secs, - BSONObjBuilder* response, - Status* result) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - *result = Status(ErrorCodes::ShutdownInProgress, "replication system is shutting down"); - return; - } - - _topCoord->prepareFreezeResponse(_replExecutor.now(), secs, response); - - if (_topCoord->getRole() == TopologyCoordinator::Role::candidate) { - // If we just unfroze and ended our stepdown period and we are a one node replica set, - // the topology coordinator will have gone into the candidate role to signal that we - // need to elect ourself. - _performPostMemberStateUpdateAction(kActionWinElection); - } - *result = Status::OK(); + if (nsToDatabaseSubstring(idx->parentNS()) == "local") { + // always enforce on local + return false; } - - Status ReplicationCoordinatorImpl::processHeartbeat(const ReplSetHeartbeatArgs& args, - ReplSetHeartbeatResponse* response) { - { - stdx::lock_guard<stdx::mutex> lock(_mutex); - if (_rsConfigState == kConfigPreStart || _rsConfigState == kConfigStartingUp) { - return Status(ErrorCodes::NotYetInitialized, - "Received heartbeat while still initializing replication system"); - } - } - - Status result(ErrorCodes::InternalError, "didn't set status in prepareHeartbeatResponse"); - CBHStatus cbh = _replExecutor.scheduleWork( - stdx::bind(&ReplicationCoordinatorImpl::_processHeartbeatFinish, - this, - stdx::placeholders::_1, - args, - response, - &result)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress"); - } - fassert(18508, cbh.getStatus()); - _replExecutor.wait(cbh.getValue()); - return result; + stdx::lock_guard<stdx::mutex> lock(_mutex); + if (getReplicationMode() != modeReplSet) { + return false; } - - void ReplicationCoordinatorImpl::_processHeartbeatFinish( - const ReplicationExecutor::CallbackArgs& cbData, - const ReplSetHeartbeatArgs& args, - ReplSetHeartbeatResponse* response, - Status* outStatus) { - - if (cbData.status == ErrorCodes::CallbackCanceled) { - *outStatus = Status(ErrorCodes::ShutdownInProgress, "Replication shutdown in progress"); - return; - } - fassert(18910, cbData.status); - const Date_t now = _replExecutor.now(); - *outStatus = _topCoord->prepareHeartbeatResponse( - now, - args, - _settings.ourSetName(), - getMyLastOptime(), - response); - if ((outStatus->isOK() || *outStatus == ErrorCodes::InvalidReplicaSetConfig) && - _selfIndex < 0) { - // If this node does not belong to the configuration it knows about, send heartbeats - // back to any node that sends us a heartbeat, in case one of those remote nodes has - // a configuration that contains us. Chances are excellent that it will, since that - // is the only reason for a remote node to send this node a heartbeat request. - if (!args.getSenderHost().empty() && _seedList.insert(args.getSenderHost()).second) { - _scheduleHeartbeatToTarget(args.getSenderHost(), -1, now); + // see SERVER-6671 + MemberState ms = _getMemberState_inlock(); + switch (ms.s) { + case MemberState::RS_SECONDARY: + case MemberState::RS_RECOVERING: + case MemberState::RS_ROLLBACK: + case MemberState::RS_STARTUP2: + return true; + default: + return false; + } +} + +OID ReplicationCoordinatorImpl::getElectionId() { + stdx::lock_guard<stdx::mutex> lock(_mutex); + return _electionId; +} + +OID ReplicationCoordinatorImpl::getMyRID() const { + stdx::lock_guard<stdx::mutex> lock(_mutex); + return _getMyRID_inlock(); +} + +OID ReplicationCoordinatorImpl::_getMyRID_inlock() const { + return _myRID; +} + +int ReplicationCoordinatorImpl::getMyId() const { + stdx::lock_guard<stdx::mutex> lock(_mutex); + return _getMyId_inlock(); +} + +int ReplicationCoordinatorImpl::_getMyId_inlock() const { + const MemberConfig& self = _rsConfig.getMemberAt(_selfIndex); + return self.getId(); +} + +bool ReplicationCoordinatorImpl::prepareReplSetUpdatePositionCommand(BSONObjBuilder* cmdBuilder) { + stdx::lock_guard<stdx::mutex> lock(_mutex); + invariant(_rsConfig.isInitialized()); + // do not send updates if we have been removed from the config + if (_selfIndex == -1) { + return false; + } + cmdBuilder->append("replSetUpdatePosition", 1); + // create an array containing objects each member connected to us and for ourself + BSONArrayBuilder arrayBuilder(cmdBuilder->subarrayStart("optimes")); + { + for (SlaveInfoVector::const_iterator itr = _slaveInfo.begin(); itr != _slaveInfo.end(); + ++itr) { + if (itr->opTime.isNull()) { + // Don't include info on members we haven't heard from yet. + continue; + } + BSONObjBuilder entry(arrayBuilder.subobjStart()); + entry.append("_id", itr->rid); + entry.append("optime", itr->opTime.getTimestamp()); + entry.append("memberId", itr->memberId); + entry.append("cfgver", _rsConfig.getConfigVersion()); + // SERVER-14550 Even though the "config" field isn't used on the other end in 3.0, + // we need to keep sending it for 2.6 compatibility. + // TODO(spencer): Remove this after 3.0 is released. + const MemberConfig* member = _rsConfig.findMemberByID(itr->memberId); + fassert(18651, member); + entry.append("config", member->toBSON(_rsConfig.getTagConfig())); + } + } + return true; +} + +Status ReplicationCoordinatorImpl::processReplSetGetStatus(BSONObjBuilder* response) { + Status result(ErrorCodes::InternalError, "didn't set status in prepareStatusResponse"); + CBHStatus cbh = + _replExecutor.scheduleWork(stdx::bind(&TopologyCoordinator::prepareStatusResponse, + _topCoord.get(), + stdx::placeholders::_1, + _replExecutor.now(), + time(0) - serverGlobalParams.started, + getMyLastOptime(), + response, + &result)); + if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { + return Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress"); + } + fassert(18640, cbh.getStatus()); + _replExecutor.wait(cbh.getValue()); + + return result; +} + +void ReplicationCoordinatorImpl::fillIsMasterForReplSet(IsMasterResponse* response) { + invariant(getSettings().usingReplSets()); + + CBHStatus cbh = _replExecutor.scheduleWork( + stdx::bind(&ReplicationCoordinatorImpl::_fillIsMasterForReplSet_finish, + this, + stdx::placeholders::_1, + response)); + if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { + response->markAsShutdownInProgress(); + return; + } + fassert(28602, cbh.getStatus()); + + _replExecutor.wait(cbh.getValue()); + if (isWaitingForApplierToDrain()) { + // Report that we are secondary to ismaster callers until drain completes. + response->setIsMaster(false); + response->setIsSecondary(true); + } +} + +void ReplicationCoordinatorImpl::_fillIsMasterForReplSet_finish( + const ReplicationExecutor::CallbackArgs& cbData, IsMasterResponse* response) { + if (cbData.status == ErrorCodes::CallbackCanceled) { + response->markAsShutdownInProgress(); + return; + } + _topCoord->fillIsMasterForReplSet(response); +} + +void ReplicationCoordinatorImpl::appendSlaveInfoData(BSONObjBuilder* result) { + stdx::lock_guard<stdx::mutex> lock(_mutex); + BSONArrayBuilder replicationProgress(result->subarrayStart("replicationProgress")); + { + for (SlaveInfoVector::const_iterator itr = _slaveInfo.begin(); itr != _slaveInfo.end(); + ++itr) { + BSONObjBuilder entry(replicationProgress.subobjStart()); + entry.append("rid", itr->rid); + // TODO(siyuan) Output term of OpTime + entry.append("optime", itr->opTime.getTimestamp()); + entry.append("host", itr->hostAndPort.toString()); + if (getReplicationMode() == modeReplSet) { + if (_selfIndex == -1) { + continue; + } + invariant(itr->memberId >= 0); + entry.append("memberId", itr->memberId); } } } +} - Status ReplicationCoordinatorImpl::processReplSetReconfig(OperationContext* txn, - const ReplSetReconfigArgs& args, - BSONObjBuilder* resultObj) { - - log() << "replSetReconfig admin command received from client"; +ReplicaSetConfig ReplicationCoordinatorImpl::getConfig() const { + stdx::lock_guard<stdx::mutex> lock(_mutex); + return _rsConfig; +} - stdx::unique_lock<stdx::mutex> lk(_mutex); +void ReplicationCoordinatorImpl::processReplSetGetConfig(BSONObjBuilder* result) { + stdx::lock_guard<stdx::mutex> lock(_mutex); + result->append("config", _rsConfig.toBSON()); +} - while (_rsConfigState == kConfigPreStart || _rsConfigState == kConfigStartingUp) { - _rsConfigStateChange.wait(lk); - } - - switch (_rsConfigState) { +bool ReplicationCoordinatorImpl::getMaintenanceMode() { + bool maintenanceMode(false); + CBHStatus cbh = _replExecutor.scheduleWork( + stdx::bind(&ReplicationCoordinatorImpl::_getMaintenanceMode_helper, + this, + stdx::placeholders::_1, + &maintenanceMode)); + if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { + return false; + } + fassert(18811, cbh.getStatus()); + _replExecutor.wait(cbh.getValue()); + return maintenanceMode; +} + +void ReplicationCoordinatorImpl::_getMaintenanceMode_helper( + const ReplicationExecutor::CallbackArgs& cbData, bool* maintenanceMode) { + if (cbData.status == ErrorCodes::CallbackCanceled) { + return; + } + *maintenanceMode = _topCoord->getMaintenanceCount() > 0; +} + +Status ReplicationCoordinatorImpl::setMaintenanceMode(bool activate) { + if (getReplicationMode() != modeReplSet) { + return Status(ErrorCodes::NoReplicationEnabled, + "can only set maintenance mode on replica set members"); + } + + Status result(ErrorCodes::InternalError, "didn't set status in _setMaintenanceMode_helper"); + CBHStatus cbh = _replExecutor.scheduleWork( + stdx::bind(&ReplicationCoordinatorImpl::_setMaintenanceMode_helper, + this, + stdx::placeholders::_1, + activate, + &result)); + if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { + return cbh.getStatus(); + } + fassert(18698, cbh.getStatus()); + _replExecutor.wait(cbh.getValue()); + return result; +} + +void ReplicationCoordinatorImpl::_setMaintenanceMode_helper( + const ReplicationExecutor::CallbackArgs& cbData, bool activate, Status* result) { + if (cbData.status == ErrorCodes::CallbackCanceled) { + *result = Status(ErrorCodes::ShutdownInProgress, "replication system is shutting down"); + return; + } + + stdx::unique_lock<stdx::mutex> lk(_mutex); + if (_getMemberState_inlock().primary()) { + *result = Status(ErrorCodes::NotSecondary, "primaries can't modify maintenance mode"); + return; + } + + int curMaintenanceCalls = _topCoord->getMaintenanceCount(); + if (activate) { + log() << "going into maintenance mode with " << curMaintenanceCalls + << " other maintenance mode tasks in progress" << rsLog; + _topCoord->adjustMaintenanceCountBy(1); + } else if (curMaintenanceCalls > 0) { + invariant(_topCoord->getRole() == TopologyCoordinator::Role::follower); + + _topCoord->adjustMaintenanceCountBy(-1); + + log() << "leaving maintenance mode (" << curMaintenanceCalls - 1 + << " other maintenance mode tasks ongoing)" << rsLog; + } else { + warning() << "Attempted to leave maintenance mode but it is not currently active"; + *result = Status(ErrorCodes::OperationFailed, "already out of maintenance mode"); + return; + } + + const PostMemberStateUpdateAction action = _updateMemberStateFromTopologyCoordinator_inlock(); + *result = Status::OK(); + lk.unlock(); + _performPostMemberStateUpdateAction(action); +} + +Status ReplicationCoordinatorImpl::processReplSetSyncFrom(const HostAndPort& target, + BSONObjBuilder* resultObj) { + Status result(ErrorCodes::InternalError, "didn't set status in prepareSyncFromResponse"); + CBHStatus cbh = + _replExecutor.scheduleWork(stdx::bind(&TopologyCoordinator::prepareSyncFromResponse, + _topCoord.get(), + stdx::placeholders::_1, + target, + _getMyLastOptime_inlock(), + resultObj, + &result)); + if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { + return Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress"); + } + fassert(18649, cbh.getStatus()); + _replExecutor.wait(cbh.getValue()); + return result; +} + +Status ReplicationCoordinatorImpl::processReplSetFreeze(int secs, BSONObjBuilder* resultObj) { + Status result(ErrorCodes::InternalError, "didn't set status in prepareFreezeResponse"); + CBHStatus cbh = _replExecutor.scheduleWork( + stdx::bind(&ReplicationCoordinatorImpl::_processReplSetFreeze_finish, + this, + stdx::placeholders::_1, + secs, + resultObj, + &result)); + if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { + return cbh.getStatus(); + } + fassert(18641, cbh.getStatus()); + _replExecutor.wait(cbh.getValue()); + return result; +} + +void ReplicationCoordinatorImpl::_processReplSetFreeze_finish( + const ReplicationExecutor::CallbackArgs& cbData, + int secs, + BSONObjBuilder* response, + Status* result) { + if (cbData.status == ErrorCodes::CallbackCanceled) { + *result = Status(ErrorCodes::ShutdownInProgress, "replication system is shutting down"); + return; + } + + _topCoord->prepareFreezeResponse(_replExecutor.now(), secs, response); + + if (_topCoord->getRole() == TopologyCoordinator::Role::candidate) { + // If we just unfroze and ended our stepdown period and we are a one node replica set, + // the topology coordinator will have gone into the candidate role to signal that we + // need to elect ourself. + _performPostMemberStateUpdateAction(kActionWinElection); + } + *result = Status::OK(); +} + +Status ReplicationCoordinatorImpl::processHeartbeat(const ReplSetHeartbeatArgs& args, + ReplSetHeartbeatResponse* response) { + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + if (_rsConfigState == kConfigPreStart || _rsConfigState == kConfigStartingUp) { + return Status(ErrorCodes::NotYetInitialized, + "Received heartbeat while still initializing replication system"); + } + } + + Status result(ErrorCodes::InternalError, "didn't set status in prepareHeartbeatResponse"); + CBHStatus cbh = + _replExecutor.scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_processHeartbeatFinish, + this, + stdx::placeholders::_1, + args, + response, + &result)); + if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { + return Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress"); + } + fassert(18508, cbh.getStatus()); + _replExecutor.wait(cbh.getValue()); + return result; +} + +void ReplicationCoordinatorImpl::_processHeartbeatFinish( + const ReplicationExecutor::CallbackArgs& cbData, + const ReplSetHeartbeatArgs& args, + ReplSetHeartbeatResponse* response, + Status* outStatus) { + if (cbData.status == ErrorCodes::CallbackCanceled) { + *outStatus = Status(ErrorCodes::ShutdownInProgress, "Replication shutdown in progress"); + return; + } + fassert(18910, cbData.status); + const Date_t now = _replExecutor.now(); + *outStatus = _topCoord->prepareHeartbeatResponse( + now, args, _settings.ourSetName(), getMyLastOptime(), response); + if ((outStatus->isOK() || *outStatus == ErrorCodes::InvalidReplicaSetConfig) && + _selfIndex < 0) { + // If this node does not belong to the configuration it knows about, send heartbeats + // back to any node that sends us a heartbeat, in case one of those remote nodes has + // a configuration that contains us. Chances are excellent that it will, since that + // is the only reason for a remote node to send this node a heartbeat request. + if (!args.getSenderHost().empty() && _seedList.insert(args.getSenderHost()).second) { + _scheduleHeartbeatToTarget(args.getSenderHost(), -1, now); + } + } +} + +Status ReplicationCoordinatorImpl::processReplSetReconfig(OperationContext* txn, + const ReplSetReconfigArgs& args, + BSONObjBuilder* resultObj) { + log() << "replSetReconfig admin command received from client"; + + stdx::unique_lock<stdx::mutex> lk(_mutex); + + while (_rsConfigState == kConfigPreStart || _rsConfigState == kConfigStartingUp) { + _rsConfigStateChange.wait(lk); + } + + switch (_rsConfigState) { case kConfigSteady: break; case kConfigUninitialized: return Status(ErrorCodes::NotYetInitialized, "Node not yet initialized; use the replSetInitiate command"); case kConfigReplicationDisabled: - invariant(false); // should be unreachable due to !_settings.usingReplSets() check above + invariant( + false); // should be unreachable due to !_settings.usingReplSets() check above case kConfigInitiating: case kConfigReconfiguring: case kConfigHBReconfiguring: @@ -1811,283 +1741,264 @@ namespace { default: severe() << "Unexpected _rsConfigState " << int(_rsConfigState); fassertFailed(18914); - } - - invariant(_rsConfig.isInitialized()); - - if (!args.force && !_getMemberState_inlock().primary()) { - return Status(ErrorCodes::NotMaster, str::stream() << - "replSetReconfig should only be run on PRIMARY, but my state is " << - _getMemberState_inlock().toString() << - "; use the \"force\" argument to override"); - } - - _setConfigState_inlock(kConfigReconfiguring); - ScopeGuard configStateGuard = MakeGuard( - lockAndCall, - &lk, - stdx::bind(&ReplicationCoordinatorImpl::_setConfigState_inlock, - this, - kConfigSteady)); - - ReplicaSetConfig oldConfig = _rsConfig; - lk.unlock(); - - ReplicaSetConfig newConfig; - BSONObj newConfigObj = args.newConfigObj; - if (args.force) { - newConfigObj = incrementConfigVersionByRandom(newConfigObj); - } - Status status = newConfig.initialize(newConfigObj); - if (!status.isOK()) { - error() << "replSetReconfig got " << status << " while parsing " << newConfigObj; - return Status(ErrorCodes::InvalidReplicaSetConfig, status.reason());; - } - if (newConfig.getReplSetName() != _settings.ourSetName()) { - str::stream errmsg; - errmsg << "Attempting to reconfigure a replica set with name " << - newConfig.getReplSetName() << ", but command line reports " << - _settings.ourSetName() << "; rejecting"; - error() << std::string(errmsg); - return Status(ErrorCodes::InvalidReplicaSetConfig, errmsg); - } - - StatusWith<int> myIndex = validateConfigForReconfig( - _externalState.get(), - oldConfig, - newConfig, - args.force); - if (!myIndex.isOK()) { - error() << "replSetReconfig got " << myIndex.getStatus() << " while validating " << - newConfigObj; - return Status(ErrorCodes::NewReplicaSetConfigurationIncompatible, - myIndex.getStatus().reason()); - } - - log() << "replSetReconfig config object with " << newConfig.getNumMembers() << - " members parses ok"; - - if (!args.force) { - status = checkQuorumForReconfig(&_replExecutor, - newConfig, - myIndex.getValue()); - if (!status.isOK()) { - error() << "replSetReconfig failed; " << status; - return status; - } - } - - status = _externalState->storeLocalConfigDocument(txn, newConfig.toBSON()); - if (!status.isOK()) { - error() << "replSetReconfig failed to store config document; " << status; - return status; - } - - const stdx::function<void (const ReplicationExecutor::CallbackArgs&)> reconfigFinishFn( - stdx::bind(&ReplicationCoordinatorImpl::_finishReplSetReconfig, - this, - stdx::placeholders::_1, - newConfig, - myIndex.getValue())); - - // If it's a force reconfig, the primary node may not be electable after the configuration - // change. In case we are that primary node, finish the reconfig under the global lock, - // so that the step down occurs safely. - CBHStatus cbh = - args.force ? - _replExecutor.scheduleWorkWithGlobalExclusiveLock(reconfigFinishFn) : - _replExecutor.scheduleWork(reconfigFinishFn); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return status; - } - fassert(18824, cbh.getStatus()); - configStateGuard.Dismiss(); - _replExecutor.wait(cbh.getValue()); - return Status::OK(); } - void ReplicationCoordinatorImpl::_finishReplSetReconfig( - const ReplicationExecutor::CallbackArgs& cbData, - const ReplicaSetConfig& newConfig, - int myIndex) { + invariant(_rsConfig.isInitialized()); - stdx::unique_lock<stdx::mutex> lk(_mutex); - invariant(_rsConfigState == kConfigReconfiguring); - invariant(_rsConfig.isInitialized()); - const PostMemberStateUpdateAction action = _setCurrentRSConfig_inlock(newConfig, myIndex); - lk.unlock(); - _performPostMemberStateUpdateAction(action); + if (!args.force && !_getMemberState_inlock().primary()) { + return Status(ErrorCodes::NotMaster, + str::stream() + << "replSetReconfig should only be run on PRIMARY, but my state is " + << _getMemberState_inlock().toString() + << "; use the \"force\" argument to override"); } - Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* txn, - const BSONObj& configObj, - BSONObjBuilder* resultObj) { - log() << "replSetInitiate admin command received from client"; + _setConfigState_inlock(kConfigReconfiguring); + ScopeGuard configStateGuard = MakeGuard( + lockAndCall, + &lk, + stdx::bind(&ReplicationCoordinatorImpl::_setConfigState_inlock, this, kConfigSteady)); - stdx::unique_lock<stdx::mutex> lk(_mutex); - if (!_settings.usingReplSets()) { - return Status(ErrorCodes::NoReplicationEnabled, "server is not running with --replSet"); - } - - while (_rsConfigState == kConfigPreStart || _rsConfigState == kConfigStartingUp) { - _rsConfigStateChange.wait(lk); - } + ReplicaSetConfig oldConfig = _rsConfig; + lk.unlock(); - if (_rsConfigState != kConfigUninitialized) { - resultObj->append("info", - "try querying local.system.replset to see current configuration"); - return Status(ErrorCodes::AlreadyInitialized, "already initialized"); - } - invariant(!_rsConfig.isInitialized()); - _setConfigState_inlock(kConfigInitiating); - ScopeGuard configStateGuard = MakeGuard( - lockAndCall, - &lk, - stdx::bind(&ReplicationCoordinatorImpl::_setConfigState_inlock, - this, - kConfigUninitialized)); - lk.unlock(); - - ReplicaSetConfig newConfig; - Status status = newConfig.initialize(configObj); - if (!status.isOK()) { - error() << "replSet initiate got " << status << " while parsing " << configObj; - return Status(ErrorCodes::InvalidReplicaSetConfig, status.reason());; - } - if (newConfig.getReplSetName() != _settings.ourSetName()) { - str::stream errmsg; - errmsg << "Attempting to initiate a replica set with name " << - newConfig.getReplSetName() << ", but command line reports " << - _settings.ourSetName() << "; rejecting"; - error() << std::string(errmsg); - return Status(ErrorCodes::InvalidReplicaSetConfig, errmsg); - } - - StatusWith<int> myIndex = validateConfigForInitiate(_externalState.get(), newConfig); - if (!myIndex.isOK()) { - error() << "replSet initiate got " << myIndex.getStatus() << " while validating " << - configObj; - return Status(ErrorCodes::InvalidReplicaSetConfig, myIndex.getStatus().reason()); - } - - log() << "replSetInitiate config object with " << newConfig.getNumMembers() << - " members parses ok"; + ReplicaSetConfig newConfig; + BSONObj newConfigObj = args.newConfigObj; + if (args.force) { + newConfigObj = incrementConfigVersionByRandom(newConfigObj); + } + Status status = newConfig.initialize(newConfigObj); + if (!status.isOK()) { + error() << "replSetReconfig got " << status << " while parsing " << newConfigObj; + return Status(ErrorCodes::InvalidReplicaSetConfig, status.reason()); + ; + } + if (newConfig.getReplSetName() != _settings.ourSetName()) { + str::stream errmsg; + errmsg << "Attempting to reconfigure a replica set with name " << newConfig.getReplSetName() + << ", but command line reports " << _settings.ourSetName() << "; rejecting"; + error() << std::string(errmsg); + return Status(ErrorCodes::InvalidReplicaSetConfig, errmsg); + } - status = checkQuorumForInitiate( - &_replExecutor, - newConfig, - myIndex.getValue()); + StatusWith<int> myIndex = + validateConfigForReconfig(_externalState.get(), oldConfig, newConfig, args.force); + if (!myIndex.isOK()) { + error() << "replSetReconfig got " << myIndex.getStatus() << " while validating " + << newConfigObj; + return Status(ErrorCodes::NewReplicaSetConfigurationIncompatible, + myIndex.getStatus().reason()); + } - if (!status.isOK()) { - error() << "replSetInitiate failed; " << status; - return status; - } + log() << "replSetReconfig config object with " << newConfig.getNumMembers() + << " members parses ok"; - status = _externalState->storeLocalConfigDocument(txn, newConfig.toBSON()); + if (!args.force) { + status = checkQuorumForReconfig(&_replExecutor, newConfig, myIndex.getValue()); if (!status.isOK()) { - error() << "replSetInitiate failed to store config document; " << status; + error() << "replSetReconfig failed; " << status; return status; } + } - CBHStatus cbh = _replExecutor.scheduleWork( - stdx::bind(&ReplicationCoordinatorImpl::_finishReplSetInitiate, - this, - stdx::placeholders::_1, - newConfig, - myIndex.getValue())); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return status; - } - configStateGuard.Dismiss(); - fassert(18654, cbh.getStatus()); - _replExecutor.wait(cbh.getValue()); - - if (status.isOK()) { - // Create the oplog with the first entry, and start repl threads. - _externalState->initiateOplog(txn); - _externalState->startThreads(); - } + status = _externalState->storeLocalConfigDocument(txn, newConfig.toBSON()); + if (!status.isOK()) { + error() << "replSetReconfig failed to store config document; " << status; return status; } - void ReplicationCoordinatorImpl::_finishReplSetInitiate( - const ReplicationExecutor::CallbackArgs& cbData, - const ReplicaSetConfig& newConfig, - int myIndex) { + const stdx::function<void(const ReplicationExecutor::CallbackArgs&)> reconfigFinishFn( + stdx::bind(&ReplicationCoordinatorImpl::_finishReplSetReconfig, + this, + stdx::placeholders::_1, + newConfig, + myIndex.getValue())); - stdx::unique_lock<stdx::mutex> lk(_mutex); - invariant(_rsConfigState == kConfigInitiating); - invariant(!_rsConfig.isInitialized()); - const PostMemberStateUpdateAction action = _setCurrentRSConfig_inlock(newConfig, myIndex); - lk.unlock(); - _performPostMemberStateUpdateAction(action); + // If it's a force reconfig, the primary node may not be electable after the configuration + // change. In case we are that primary node, finish the reconfig under the global lock, + // so that the step down occurs safely. + CBHStatus cbh = args.force ? _replExecutor.scheduleWorkWithGlobalExclusiveLock(reconfigFinishFn) + : _replExecutor.scheduleWork(reconfigFinishFn); + if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { + return status; + } + fassert(18824, cbh.getStatus()); + configStateGuard.Dismiss(); + _replExecutor.wait(cbh.getValue()); + return Status::OK(); +} + +void ReplicationCoordinatorImpl::_finishReplSetReconfig( + const ReplicationExecutor::CallbackArgs& cbData, + const ReplicaSetConfig& newConfig, + int myIndex) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + invariant(_rsConfigState == kConfigReconfiguring); + invariant(_rsConfig.isInitialized()); + const PostMemberStateUpdateAction action = _setCurrentRSConfig_inlock(newConfig, myIndex); + lk.unlock(); + _performPostMemberStateUpdateAction(action); +} + +Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* txn, + const BSONObj& configObj, + BSONObjBuilder* resultObj) { + log() << "replSetInitiate admin command received from client"; + + stdx::unique_lock<stdx::mutex> lk(_mutex); + if (!_settings.usingReplSets()) { + return Status(ErrorCodes::NoReplicationEnabled, "server is not running with --replSet"); + } + + while (_rsConfigState == kConfigPreStart || _rsConfigState == kConfigStartingUp) { + _rsConfigStateChange.wait(lk); + } + + if (_rsConfigState != kConfigUninitialized) { + resultObj->append("info", "try querying local.system.replset to see current configuration"); + return Status(ErrorCodes::AlreadyInitialized, "already initialized"); + } + invariant(!_rsConfig.isInitialized()); + _setConfigState_inlock(kConfigInitiating); + ScopeGuard configStateGuard = MakeGuard( + lockAndCall, + &lk, + stdx::bind( + &ReplicationCoordinatorImpl::_setConfigState_inlock, this, kConfigUninitialized)); + lk.unlock(); + + ReplicaSetConfig newConfig; + Status status = newConfig.initialize(configObj); + if (!status.isOK()) { + error() << "replSet initiate got " << status << " while parsing " << configObj; + return Status(ErrorCodes::InvalidReplicaSetConfig, status.reason()); + ; + } + if (newConfig.getReplSetName() != _settings.ourSetName()) { + str::stream errmsg; + errmsg << "Attempting to initiate a replica set with name " << newConfig.getReplSetName() + << ", but command line reports " << _settings.ourSetName() << "; rejecting"; + error() << std::string(errmsg); + return Status(ErrorCodes::InvalidReplicaSetConfig, errmsg); + } + + StatusWith<int> myIndex = validateConfigForInitiate(_externalState.get(), newConfig); + if (!myIndex.isOK()) { + error() << "replSet initiate got " << myIndex.getStatus() << " while validating " + << configObj; + return Status(ErrorCodes::InvalidReplicaSetConfig, myIndex.getStatus().reason()); + } + + log() << "replSetInitiate config object with " << newConfig.getNumMembers() + << " members parses ok"; + + status = checkQuorumForInitiate(&_replExecutor, newConfig, myIndex.getValue()); + + if (!status.isOK()) { + error() << "replSetInitiate failed; " << status; + return status; } - void ReplicationCoordinatorImpl::_setConfigState_inlock(ConfigState newState) { - if (newState != _rsConfigState) { - _rsConfigState = newState; - _rsConfigStateChange.notify_all(); - } + status = _externalState->storeLocalConfigDocument(txn, newConfig.toBSON()); + if (!status.isOK()) { + error() << "replSetInitiate failed to store config document; " << status; + return status; } - ReplicationCoordinatorImpl::PostMemberStateUpdateAction - ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock() { - const MemberState newState = _topCoord->getMemberState(); - if (newState == _memberState) { - if (_topCoord->getRole() == TopologyCoordinator::Role::candidate) { - invariant(_rsConfig.getNumMembers() == 1 && - _selfIndex == 0 && - _rsConfig.getMemberAt(0).isElectable()); - return kActionWinElection; - } - return kActionNone; - } + CBHStatus cbh = + _replExecutor.scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_finishReplSetInitiate, + this, + stdx::placeholders::_1, + newConfig, + myIndex.getValue())); + if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { + return status; + } + configStateGuard.Dismiss(); + fassert(18654, cbh.getStatus()); + _replExecutor.wait(cbh.getValue()); - PostMemberStateUpdateAction result; - if (_memberState.primary() || newState.removed() || newState.rollback()) { - // Wake up any threads blocked in awaitReplication, close connections, etc. - for (std::vector<WaiterInfo*>::iterator it = _replicationWaiterList.begin(); - it != _replicationWaiterList.end(); ++it) { - WaiterInfo* info = *it; - info->master = false; - info->condVar->notify_all(); - } - _isWaitingForDrainToComplete = false; - _canAcceptNonLocalWrites = false; - result = kActionCloseAllConnections; - } - else { - result = kActionFollowerModeStateChange; + if (status.isOK()) { + // Create the oplog with the first entry, and start repl threads. + _externalState->initiateOplog(txn); + _externalState->startThreads(); + } + return status; +} + +void ReplicationCoordinatorImpl::_finishReplSetInitiate( + const ReplicationExecutor::CallbackArgs& cbData, + const ReplicaSetConfig& newConfig, + int myIndex) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + invariant(_rsConfigState == kConfigInitiating); + invariant(!_rsConfig.isInitialized()); + const PostMemberStateUpdateAction action = _setCurrentRSConfig_inlock(newConfig, myIndex); + lk.unlock(); + _performPostMemberStateUpdateAction(action); +} + +void ReplicationCoordinatorImpl::_setConfigState_inlock(ConfigState newState) { + if (newState != _rsConfigState) { + _rsConfigState = newState; + _rsConfigStateChange.notify_all(); + } +} + +ReplicationCoordinatorImpl::PostMemberStateUpdateAction +ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock() { + const MemberState newState = _topCoord->getMemberState(); + if (newState == _memberState) { + if (_topCoord->getRole() == TopologyCoordinator::Role::candidate) { + invariant(_rsConfig.getNumMembers() == 1 && _selfIndex == 0 && + _rsConfig.getMemberAt(0).isElectable()); + return kActionWinElection; } + return kActionNone; + } - if (_memberState.secondary() && !newState.primary()) { - // Switching out of SECONDARY, but not to PRIMARY. - _canServeNonLocalReads.store(0U); - } - else if (!_memberState.primary() && newState.secondary()) { - // Switching into SECONDARY, but not from PRIMARY. - _canServeNonLocalReads.store(1U); + PostMemberStateUpdateAction result; + if (_memberState.primary() || newState.removed() || newState.rollback()) { + // Wake up any threads blocked in awaitReplication, close connections, etc. + for (std::vector<WaiterInfo*>::iterator it = _replicationWaiterList.begin(); + it != _replicationWaiterList.end(); + ++it) { + WaiterInfo* info = *it; + info->master = false; + info->condVar->notify_all(); } + _isWaitingForDrainToComplete = false; + _canAcceptNonLocalWrites = false; + result = kActionCloseAllConnections; + } else { + result = kActionFollowerModeStateChange; + } - if (newState.secondary() && _topCoord->getRole() == TopologyCoordinator::Role::candidate) { - // When transitioning to SECONDARY, the only way for _topCoord to report the candidate - // role is if the configuration represents a single-node replica set. In that case, the - // overriding requirement is to elect this singleton node primary. - invariant(_rsConfig.getNumMembers() == 1 && - _selfIndex == 0 && - _rsConfig.getMemberAt(0).isElectable()); - result = kActionWinElection; - } + if (_memberState.secondary() && !newState.primary()) { + // Switching out of SECONDARY, but not to PRIMARY. + _canServeNonLocalReads.store(0U); + } else if (!_memberState.primary() && newState.secondary()) { + // Switching into SECONDARY, but not from PRIMARY. + _canServeNonLocalReads.store(1U); + } - _memberState = newState; - log() << "transition to " << newState.toString() << rsLog; - return result; + if (newState.secondary() && _topCoord->getRole() == TopologyCoordinator::Role::candidate) { + // When transitioning to SECONDARY, the only way for _topCoord to report the candidate + // role is if the configuration represents a single-node replica set. In that case, the + // overriding requirement is to elect this singleton node primary. + invariant(_rsConfig.getNumMembers() == 1 && _selfIndex == 0 && + _rsConfig.getMemberAt(0).isElectable()); + result = kActionWinElection; } - void ReplicationCoordinatorImpl::_performPostMemberStateUpdateAction( - PostMemberStateUpdateAction action) { + _memberState = newState; + log() << "transition to " << newState.toString() << rsLog; + return result; +} - switch (action) { +void ReplicationCoordinatorImpl::_performPostMemberStateUpdateAction( + PostMemberStateUpdateAction action) { + switch (action) { case kActionNone: break; case kActionFollowerModeStateChange: @@ -2114,702 +2025,671 @@ namespace { default: severe() << "Unknown post member state update action " << static_cast<int>(action); fassertFailed(26010); - } } - - Status ReplicationCoordinatorImpl::processReplSetGetRBID(BSONObjBuilder* resultObj) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - resultObj->append("rbid", _rbid); - return Status::OK(); - } - - void ReplicationCoordinatorImpl::incrementRollbackID() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - ++_rbid; - } - - Status ReplicationCoordinatorImpl::processReplSetFresh(const ReplSetFreshArgs& args, - BSONObjBuilder* resultObj) { - - Status result(ErrorCodes::InternalError, "didn't set status in prepareFreshResponse"); - CBHStatus cbh = _replExecutor.scheduleWork( - stdx::bind(&ReplicationCoordinatorImpl::_processReplSetFresh_finish, - this, - stdx::placeholders::_1, - args, - resultObj, - &result)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress"); - } - fassert(18652, cbh.getStatus()); - _replExecutor.wait(cbh.getValue()); - return result; - } - - void ReplicationCoordinatorImpl::_processReplSetFresh_finish( - const ReplicationExecutor::CallbackArgs& cbData, - const ReplSetFreshArgs& args, - BSONObjBuilder* response, - Status* result) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - *result = Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress"); - return; +} + +Status ReplicationCoordinatorImpl::processReplSetGetRBID(BSONObjBuilder* resultObj) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + resultObj->append("rbid", _rbid); + return Status::OK(); +} + +void ReplicationCoordinatorImpl::incrementRollbackID() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + ++_rbid; +} + +Status ReplicationCoordinatorImpl::processReplSetFresh(const ReplSetFreshArgs& args, + BSONObjBuilder* resultObj) { + Status result(ErrorCodes::InternalError, "didn't set status in prepareFreshResponse"); + CBHStatus cbh = _replExecutor.scheduleWork( + stdx::bind(&ReplicationCoordinatorImpl::_processReplSetFresh_finish, + this, + stdx::placeholders::_1, + args, + resultObj, + &result)); + if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { + return Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress"); + } + fassert(18652, cbh.getStatus()); + _replExecutor.wait(cbh.getValue()); + return result; +} + +void ReplicationCoordinatorImpl::_processReplSetFresh_finish( + const ReplicationExecutor::CallbackArgs& cbData, + const ReplSetFreshArgs& args, + BSONObjBuilder* response, + Status* result) { + if (cbData.status == ErrorCodes::CallbackCanceled) { + *result = Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress"); + return; + } + + _topCoord->prepareFreshResponse(args, _replExecutor.now(), getMyLastOptime(), response, result); +} + +Status ReplicationCoordinatorImpl::processReplSetElect(const ReplSetElectArgs& args, + BSONObjBuilder* responseObj) { + Status result = Status(ErrorCodes::InternalError, "status not set by callback"); + CBHStatus cbh = _replExecutor.scheduleWork( + stdx::bind(&ReplicationCoordinatorImpl::_processReplSetElect_finish, + this, + stdx::placeholders::_1, + args, + responseObj, + &result)); + if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { + return Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress"); + } + fassert(18657, cbh.getStatus()); + _replExecutor.wait(cbh.getValue()); + return result; +} + +void ReplicationCoordinatorImpl::_processReplSetElect_finish( + const ReplicationExecutor::CallbackArgs& cbData, + const ReplSetElectArgs& args, + BSONObjBuilder* response, + Status* result) { + if (cbData.status == ErrorCodes::CallbackCanceled) { + *result = Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress"); + return; + } + + _topCoord->prepareElectResponse(args, _replExecutor.now(), getMyLastOptime(), response, result); +} + +ReplicationCoordinatorImpl::PostMemberStateUpdateAction +ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock(const ReplicaSetConfig& newConfig, + int myIndex) { + invariant(_settings.usingReplSets()); + _cancelHeartbeats(); + _setConfigState_inlock(kConfigSteady); + // Must get this before changing our config. + OpTime myOptime = _getMyLastOptime_inlock(); + _topCoord->updateConfig(newConfig, myIndex, _replExecutor.now(), myOptime); + _rsConfig = newConfig; + log() << "New replica set config in use: " << _rsConfig.toBSON() << rsLog; + _selfIndex = myIndex; + if (_selfIndex >= 0) { + log() << "This node is " << _rsConfig.getMemberAt(_selfIndex).getHostAndPort() + << " in the config"; + } else { + log() << "This node is not a member of the config"; + } + + const PostMemberStateUpdateAction action = _updateMemberStateFromTopologyCoordinator_inlock(); + _updateSlaveInfoFromConfig_inlock(); + if (_selfIndex >= 0) { + // Don't send heartbeats if we're not in the config, if we get re-added one of the + // nodes in the set will contact us. + _startHeartbeats(); + } + _wakeReadyWaiters_inlock(); + return action; +} + +void ReplicationCoordinatorImpl::_wakeReadyWaiters_inlock() { + for (std::vector<WaiterInfo*>::iterator it = _replicationWaiterList.begin(); + it != _replicationWaiterList.end(); + ++it) { + WaiterInfo* info = *it; + if (_doneWaitingForReplication_inlock(*info->opTime, *info->writeConcern)) { + info->condVar->notify_all(); } - - _topCoord->prepareFreshResponse( - args, _replExecutor.now(), getMyLastOptime(), response, result); } +} - Status ReplicationCoordinatorImpl::processReplSetElect(const ReplSetElectArgs& args, - BSONObjBuilder* responseObj) { - Status result = Status(ErrorCodes::InternalError, "status not set by callback"); - CBHStatus cbh = _replExecutor.scheduleWork( - stdx::bind(&ReplicationCoordinatorImpl::_processReplSetElect_finish, - this, - stdx::placeholders::_1, - args, - responseObj, - &result)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress"); - } - fassert(18657, cbh.getStatus()); - _replExecutor.wait(cbh.getValue()); - return result; - } - - void ReplicationCoordinatorImpl::_processReplSetElect_finish( - const ReplicationExecutor::CallbackArgs& cbData, - const ReplSetElectArgs& args, - BSONObjBuilder* response, - Status* result) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - *result = Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress"); - return; - } - - _topCoord->prepareElectResponse( - args, _replExecutor.now(), getMyLastOptime(), response, result); - } - - ReplicationCoordinatorImpl::PostMemberStateUpdateAction - ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock( - const ReplicaSetConfig& newConfig, - int myIndex) { - invariant(_settings.usingReplSets()); - _cancelHeartbeats(); - _setConfigState_inlock(kConfigSteady); - // Must get this before changing our config. - OpTime myOptime = _getMyLastOptime_inlock(); - _topCoord->updateConfig( - newConfig, - myIndex, - _replExecutor.now(), - myOptime); - _rsConfig = newConfig; - log() << "New replica set config in use: " << _rsConfig.toBSON() << rsLog; - _selfIndex = myIndex; - if (_selfIndex >= 0) { - log() << "This node is " << - _rsConfig.getMemberAt(_selfIndex).getHostAndPort() << " in the config"; - } - else { - log() << "This node is not a member of the config"; - } - - const PostMemberStateUpdateAction action = - _updateMemberStateFromTopologyCoordinator_inlock(); - _updateSlaveInfoFromConfig_inlock(); - if (_selfIndex >= 0) { - // Don't send heartbeats if we're not in the config, if we get re-added one of the - // nodes in the set will contact us. - _startHeartbeats(); - } - _wakeReadyWaiters_inlock(); - return action; - } - - void ReplicationCoordinatorImpl::_wakeReadyWaiters_inlock(){ - for (std::vector<WaiterInfo*>::iterator it = _replicationWaiterList.begin(); - it != _replicationWaiterList.end(); ++it) { - WaiterInfo* info = *it; - if (_doneWaitingForReplication_inlock(*info->opTime, *info->writeConcern)) { - info->condVar->notify_all(); - } +Status ReplicationCoordinatorImpl::processReplSetUpdatePosition(const UpdatePositionArgs& updates, + long long* configVersion) { + stdx::unique_lock<stdx::mutex> lock(_mutex); + Status status = Status::OK(); + bool somethingChanged = false; + for (UpdatePositionArgs::UpdateIterator update = updates.updatesBegin(); + update != updates.updatesEnd(); + ++update) { + status = _setLastOptime_inlock(*update, configVersion); + if (!status.isOK()) { + break; } + somethingChanged = true; } - Status ReplicationCoordinatorImpl::processReplSetUpdatePosition( - const UpdatePositionArgs& updates, long long* configVersion) { - - stdx::unique_lock<stdx::mutex> lock(_mutex); - Status status = Status::OK(); - bool somethingChanged = false; - for (UpdatePositionArgs::UpdateIterator update = updates.updatesBegin(); - update != updates.updatesEnd(); - ++update) { - status = _setLastOptime_inlock(*update, configVersion); - if (!status.isOK()) { - break; - } - somethingChanged = true; - } - - if (somethingChanged && !_getMemberState_inlock().primary()) { - lock.unlock(); - // Must do this outside _mutex - // TODO: enable _dr, remove _externalState when DataReplicator is used excl. - //_dr.slavesHaveProgressed(); - _externalState->forwardSlaveProgress(); - } - return status; + if (somethingChanged && !_getMemberState_inlock().primary()) { + lock.unlock(); + // Must do this outside _mutex + // TODO: enable _dr, remove _externalState when DataReplicator is used excl. + //_dr.slavesHaveProgressed(); + _externalState->forwardSlaveProgress(); } + return status; +} - Status ReplicationCoordinatorImpl::processHandshake(OperationContext* txn, - const HandshakeArgs& handshake) { - LOG(2) << "Received handshake " << handshake.toBSON(); - - stdx::unique_lock<stdx::mutex> lock(_mutex); - - if (getReplicationMode() != modeMasterSlave) { - return Status(ErrorCodes::IllegalOperation, - "The handshake command is only used for master/slave replication"); - } - - SlaveInfo* slaveInfo = _findSlaveInfoByRID_inlock(handshake.getRid()); - if (slaveInfo) { - return Status::OK(); // nothing to do - } - - SlaveInfo newSlaveInfo; - newSlaveInfo.rid = handshake.getRid(); - newSlaveInfo.memberId = -1; - newSlaveInfo.hostAndPort = _externalState->getClientHostAndPort(txn); - // Don't call _addSlaveInfo_inlock as that would wake sleepers unnecessarily. - _slaveInfo.push_back(newSlaveInfo); +Status ReplicationCoordinatorImpl::processHandshake(OperationContext* txn, + const HandshakeArgs& handshake) { + LOG(2) << "Received handshake " << handshake.toBSON(); - return Status::OK(); - } + stdx::unique_lock<stdx::mutex> lock(_mutex); - bool ReplicationCoordinatorImpl::buildsIndexes() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_selfIndex == -1) { - return true; - } - const MemberConfig& self = _rsConfig.getMemberAt(_selfIndex); - return self.shouldBuildIndexes(); + if (getReplicationMode() != modeMasterSlave) { + return Status(ErrorCodes::IllegalOperation, + "The handshake command is only used for master/slave replication"); } - std::vector<HostAndPort> ReplicationCoordinatorImpl::getHostsWrittenTo(const OpTime& op) { - std::vector<HostAndPort> hosts; - stdx::lock_guard<stdx::mutex> lk(_mutex); - for (size_t i = 0; i < _slaveInfo.size(); ++i) { - const SlaveInfo& slaveInfo = _slaveInfo[i]; - if (slaveInfo.opTime < op) { - continue; - } - - if (getReplicationMode() == modeMasterSlave && slaveInfo.rid == _getMyRID_inlock()) { - // Master-slave doesn't know the HostAndPort for itself at this point. - continue; - } - hosts.push_back(slaveInfo.hostAndPort); - } - return hosts; + SlaveInfo* slaveInfo = _findSlaveInfoByRID_inlock(handshake.getRid()); + if (slaveInfo) { + return Status::OK(); // nothing to do } - std::vector<HostAndPort> ReplicationCoordinatorImpl::getOtherNodesInReplSet() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); - invariant(_settings.usingReplSets()); + SlaveInfo newSlaveInfo; + newSlaveInfo.rid = handshake.getRid(); + newSlaveInfo.memberId = -1; + newSlaveInfo.hostAndPort = _externalState->getClientHostAndPort(txn); + // Don't call _addSlaveInfo_inlock as that would wake sleepers unnecessarily. + _slaveInfo.push_back(newSlaveInfo); - std::vector<HostAndPort> nodes; - if (_selfIndex == -1) { - return nodes; - } - - for (int i = 0; i < _rsConfig.getNumMembers(); ++i) { - if (i == _selfIndex) - continue; - - nodes.push_back(_rsConfig.getMemberAt(i).getHostAndPort()); - } - return nodes; - } + return Status::OK(); +} - Status ReplicationCoordinatorImpl::checkIfWriteConcernCanBeSatisfied( - const WriteConcernOptions& writeConcern) const { - stdx::lock_guard<stdx::mutex> lock(_mutex); - return _checkIfWriteConcernCanBeSatisfied_inlock(writeConcern); +bool ReplicationCoordinatorImpl::buildsIndexes() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + if (_selfIndex == -1) { + return true; } + const MemberConfig& self = _rsConfig.getMemberAt(_selfIndex); + return self.shouldBuildIndexes(); +} - Status ReplicationCoordinatorImpl::_checkIfWriteConcernCanBeSatisfied_inlock( - const WriteConcernOptions& writeConcern) const { - if (getReplicationMode() == modeNone) { - return Status(ErrorCodes::NoReplicationEnabled, - "No replication enabled when checking if write concern can be satisfied"); +std::vector<HostAndPort> ReplicationCoordinatorImpl::getHostsWrittenTo(const OpTime& op) { + std::vector<HostAndPort> hosts; + stdx::lock_guard<stdx::mutex> lk(_mutex); + for (size_t i = 0; i < _slaveInfo.size(); ++i) { + const SlaveInfo& slaveInfo = _slaveInfo[i]; + if (slaveInfo.opTime < op) { + continue; } - if (getReplicationMode() == modeMasterSlave) { - if (!writeConcern.wMode.empty()) { - return Status(ErrorCodes::UnknownReplWriteConcern, - "Cannot use named write concern modes in master-slave"); - } - // No way to know how many slaves there are, so assume any numeric mode is possible. - return Status::OK(); + if (getReplicationMode() == modeMasterSlave && slaveInfo.rid == _getMyRID_inlock()) { + // Master-slave doesn't know the HostAndPort for itself at this point. + continue; } - - invariant(getReplicationMode() == modeReplSet); - return _rsConfig.checkIfWriteConcernCanBeSatisfied(writeConcern); + hosts.push_back(slaveInfo.hostAndPort); } + return hosts; +} - WriteConcernOptions ReplicationCoordinatorImpl::getGetLastErrorDefault() { - stdx::lock_guard<stdx::mutex> lock(_mutex); - if (_rsConfig.isInitialized()) { - return _rsConfig.getDefaultWriteConcern(); - } - return WriteConcernOptions(); - } - - Status ReplicationCoordinatorImpl::checkReplEnabledForCommand(BSONObjBuilder* result) { - if (!_settings.usingReplSets()) { - if (serverGlobalParams.configsvr) { - result->append("info", "configsvr"); // for shell prompt - } - return Status(ErrorCodes::NoReplicationEnabled, "not running with --replSet"); - } +std::vector<HostAndPort> ReplicationCoordinatorImpl::getOtherNodesInReplSet() const { + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(_settings.usingReplSets()); - if (getMemberState().startup()) { - result->append("info", "run rs.initiate(...) if not yet done for the set"); - return Status(ErrorCodes::NotYetInitialized, "no replset config has been received"); - } - - return Status::OK(); - } - - bool ReplicationCoordinatorImpl::isReplEnabled() const { - return getReplicationMode() != modeNone; + std::vector<HostAndPort> nodes; + if (_selfIndex == -1) { + return nodes; } - void ReplicationCoordinatorImpl::_chooseNewSyncSource( - const ReplicationExecutor::CallbackArgs& cbData, - HostAndPort* newSyncSource) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - return; - } - *newSyncSource = _topCoord->chooseNewSyncSource(_replExecutor.now(), - getMyLastOptime()); - } + for (int i = 0; i < _rsConfig.getNumMembers(); ++i) { + if (i == _selfIndex) + continue; - HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource() { - HostAndPort newSyncSource; - CBHStatus cbh = _replExecutor.scheduleWork( - stdx::bind(&ReplicationCoordinatorImpl::_chooseNewSyncSource, - this, - stdx::placeholders::_1, - &newSyncSource)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return newSyncSource; // empty - } - fassert(18740, cbh.getStatus()); - _replExecutor.wait(cbh.getValue()); - return newSyncSource; + nodes.push_back(_rsConfig.getMemberAt(i).getHostAndPort()); } + return nodes; +} - void ReplicationCoordinatorImpl::_blacklistSyncSource( - const ReplicationExecutor::CallbackArgs& cbData, - const HostAndPort& host, - Date_t until) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - return; - } - _topCoord->blacklistSyncSource(host, until); - - CBHStatus cbh = _replExecutor.scheduleWorkAt( - until, - stdx::bind(&ReplicationCoordinatorImpl::_unblacklistSyncSource, - this, - stdx::placeholders::_1, - host)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return; - } - fassert(28610, cbh.getStatus()); - } +Status ReplicationCoordinatorImpl::checkIfWriteConcernCanBeSatisfied( + const WriteConcernOptions& writeConcern) const { + stdx::lock_guard<stdx::mutex> lock(_mutex); + return _checkIfWriteConcernCanBeSatisfied_inlock(writeConcern); +} - void ReplicationCoordinatorImpl::_unblacklistSyncSource( - const ReplicationExecutor::CallbackArgs& cbData, - const HostAndPort& host) { - if (cbData.status == ErrorCodes::CallbackCanceled) - return; - _topCoord->unblacklistSyncSource(host, _replExecutor.now()); +Status ReplicationCoordinatorImpl::_checkIfWriteConcernCanBeSatisfied_inlock( + const WriteConcernOptions& writeConcern) const { + if (getReplicationMode() == modeNone) { + return Status(ErrorCodes::NoReplicationEnabled, + "No replication enabled when checking if write concern can be satisfied"); } - void ReplicationCoordinatorImpl::blacklistSyncSource(const HostAndPort& host, Date_t until) { - CBHStatus cbh = _replExecutor.scheduleWork( - stdx::bind(&ReplicationCoordinatorImpl::_blacklistSyncSource, - this, - stdx::placeholders::_1, - host, - until)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return; - } - fassert(18741, cbh.getStatus()); - _replExecutor.wait(cbh.getValue()); - } - - void ReplicationCoordinatorImpl::resetLastOpTimeFromOplog(OperationContext* txn) { - StatusWith<OpTime> lastOpTimeStatus = _externalState->loadLastOpTime(txn); - OpTime lastOpTime; - if (!lastOpTimeStatus.isOK()) { - warning() << "Failed to load timestamp of most recently applied operation; " << - lastOpTimeStatus.getStatus(); - } - else { - lastOpTime = lastOpTimeStatus.getValue(); + if (getReplicationMode() == modeMasterSlave) { + if (!writeConcern.wMode.empty()) { + return Status(ErrorCodes::UnknownReplWriteConcern, + "Cannot use named write concern modes in master-slave"); } - stdx::unique_lock<stdx::mutex> lk(_mutex); - _setMyLastOptime_inlock(&lk, lastOpTime, true); - _externalState->setGlobalTimestamp(lastOpTime.getTimestamp()); + // No way to know how many slaves there are, so assume any numeric mode is possible. + return Status::OK(); } - void ReplicationCoordinatorImpl::_shouldChangeSyncSource( - const ReplicationExecutor::CallbackArgs& cbData, - const HostAndPort& currentSource, - bool* shouldChange) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - return; - } - - *shouldChange = _topCoord->shouldChangeSyncSource(currentSource, _replExecutor.now()); + invariant(getReplicationMode() == modeReplSet); + return _rsConfig.checkIfWriteConcernCanBeSatisfied(writeConcern); +} + +WriteConcernOptions ReplicationCoordinatorImpl::getGetLastErrorDefault() { + stdx::lock_guard<stdx::mutex> lock(_mutex); + if (_rsConfig.isInitialized()) { + return _rsConfig.getDefaultWriteConcern(); + } + return WriteConcernOptions(); +} + +Status ReplicationCoordinatorImpl::checkReplEnabledForCommand(BSONObjBuilder* result) { + if (!_settings.usingReplSets()) { + if (serverGlobalParams.configsvr) { + result->append("info", "configsvr"); // for shell prompt + } + return Status(ErrorCodes::NoReplicationEnabled, "not running with --replSet"); + } + + if (getMemberState().startup()) { + result->append("info", "run rs.initiate(...) if not yet done for the set"); + return Status(ErrorCodes::NotYetInitialized, "no replset config has been received"); + } + + return Status::OK(); +} + +bool ReplicationCoordinatorImpl::isReplEnabled() const { + return getReplicationMode() != modeNone; +} + +void ReplicationCoordinatorImpl::_chooseNewSyncSource( + const ReplicationExecutor::CallbackArgs& cbData, HostAndPort* newSyncSource) { + if (cbData.status == ErrorCodes::CallbackCanceled) { + return; + } + *newSyncSource = _topCoord->chooseNewSyncSource(_replExecutor.now(), getMyLastOptime()); +} + +HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource() { + HostAndPort newSyncSource; + CBHStatus cbh = + _replExecutor.scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_chooseNewSyncSource, + this, + stdx::placeholders::_1, + &newSyncSource)); + if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { + return newSyncSource; // empty + } + fassert(18740, cbh.getStatus()); + _replExecutor.wait(cbh.getValue()); + return newSyncSource; +} + +void ReplicationCoordinatorImpl::_blacklistSyncSource( + const ReplicationExecutor::CallbackArgs& cbData, const HostAndPort& host, Date_t until) { + if (cbData.status == ErrorCodes::CallbackCanceled) { + return; + } + _topCoord->blacklistSyncSource(host, until); + + CBHStatus cbh = + _replExecutor.scheduleWorkAt(until, + stdx::bind(&ReplicationCoordinatorImpl::_unblacklistSyncSource, + this, + stdx::placeholders::_1, + host)); + if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { + return; + } + fassert(28610, cbh.getStatus()); +} + +void ReplicationCoordinatorImpl::_unblacklistSyncSource( + const ReplicationExecutor::CallbackArgs& cbData, const HostAndPort& host) { + if (cbData.status == ErrorCodes::CallbackCanceled) + return; + _topCoord->unblacklistSyncSource(host, _replExecutor.now()); +} + +void ReplicationCoordinatorImpl::blacklistSyncSource(const HostAndPort& host, Date_t until) { + CBHStatus cbh = + _replExecutor.scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_blacklistSyncSource, + this, + stdx::placeholders::_1, + host, + until)); + if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { + return; + } + fassert(18741, cbh.getStatus()); + _replExecutor.wait(cbh.getValue()); +} + +void ReplicationCoordinatorImpl::resetLastOpTimeFromOplog(OperationContext* txn) { + StatusWith<OpTime> lastOpTimeStatus = _externalState->loadLastOpTime(txn); + OpTime lastOpTime; + if (!lastOpTimeStatus.isOK()) { + warning() << "Failed to load timestamp of most recently applied operation; " + << lastOpTimeStatus.getStatus(); + } else { + lastOpTime = lastOpTimeStatus.getValue(); + } + stdx::unique_lock<stdx::mutex> lk(_mutex); + _setMyLastOptime_inlock(&lk, lastOpTime, true); + _externalState->setGlobalTimestamp(lastOpTime.getTimestamp()); +} + +void ReplicationCoordinatorImpl::_shouldChangeSyncSource( + const ReplicationExecutor::CallbackArgs& cbData, + const HostAndPort& currentSource, + bool* shouldChange) { + if (cbData.status == ErrorCodes::CallbackCanceled) { + return; + } + + *shouldChange = _topCoord->shouldChangeSyncSource(currentSource, _replExecutor.now()); +} + +bool ReplicationCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentSource) { + bool shouldChange(false); + CBHStatus cbh = + _replExecutor.scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_shouldChangeSyncSource, + this, + stdx::placeholders::_1, + currentSource, + &shouldChange)); + if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { + return false; } + fassert(18906, cbh.getStatus()); + _replExecutor.wait(cbh.getValue()); + return shouldChange; +} - bool ReplicationCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentSource) { - bool shouldChange(false); - CBHStatus cbh = _replExecutor.scheduleWork( - stdx::bind(&ReplicationCoordinatorImpl::_shouldChangeSyncSource, - this, - stdx::placeholders::_1, - currentSource, - &shouldChange)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return false; - } - fassert(18906, cbh.getStatus()); - _replExecutor.wait(cbh.getValue()); - return shouldChange; +void ReplicationCoordinatorImpl::_updateLastCommittedOpTime_inlock() { + if (!_getMemberState_inlock().primary()) { + return; } + StatusWith<ReplicaSetTagPattern> tagPattern = + _rsConfig.findCustomWriteMode(ReplicaSetConfig::kMajorityWriteConcernModeName); + invariant(tagPattern.isOK()); + ReplicaSetTagMatch matcher{tagPattern.getValue()}; - void ReplicationCoordinatorImpl::_updateLastCommittedOpTime_inlock() { - if (!_getMemberState_inlock().primary()) { - return; - } - StatusWith<ReplicaSetTagPattern> tagPattern = - _rsConfig.findCustomWriteMode(ReplicaSetConfig::kMajorityWriteConcernModeName); - invariant(tagPattern.isOK()); - ReplicaSetTagMatch matcher{tagPattern.getValue()}; - - std::vector<OpTime> votingNodesOpTimes; + std::vector<OpTime> votingNodesOpTimes; - for (const auto& sI : _slaveInfo) { - auto memberConfig = _rsConfig.findMemberByID(sI.memberId); - invariant(memberConfig); - for (auto tagIt = memberConfig->tagsBegin(); - tagIt != memberConfig->tagsEnd(); ++tagIt) { - if (matcher.update(*tagIt)) { - votingNodesOpTimes.push_back(sI.opTime); - break; - } + for (const auto& sI : _slaveInfo) { + auto memberConfig = _rsConfig.findMemberByID(sI.memberId); + invariant(memberConfig); + for (auto tagIt = memberConfig->tagsBegin(); tagIt != memberConfig->tagsEnd(); ++tagIt) { + if (matcher.update(*tagIt)) { + votingNodesOpTimes.push_back(sI.opTime); + break; } } - invariant(votingNodesOpTimes.size() > 0); - std::sort(votingNodesOpTimes.begin(), votingNodesOpTimes.end()); - - // Use the index of the minimum quorum in the vector of nodes. - _lastCommittedOpTime = votingNodesOpTimes[(votingNodesOpTimes.size() - 1) / 2]; } + invariant(votingNodesOpTimes.size() > 0); + std::sort(votingNodesOpTimes.begin(), votingNodesOpTimes.end()); - OpTime ReplicationCoordinatorImpl::getLastCommittedOpTime() const { - stdx::unique_lock<stdx::mutex> lk(_mutex); - return _lastCommittedOpTime; - } - - Status ReplicationCoordinatorImpl::processReplSetRequestVotes( - OperationContext* txn, - const ReplSetRequestVotesArgs& args, - ReplSetRequestVotesResponse* response) { - if (!isV1ElectionProtocol()) { - return {ErrorCodes::BadValue, "not using election protocol v1"}; - } - - updateTerm(args.getTerm()); - - Status result{ErrorCodes::InternalError, "didn't set status in processReplSetRequestVotes"}; - CBHStatus cbh = _replExecutor.scheduleWork( - stdx::bind(&ReplicationCoordinatorImpl::_processReplSetRequestVotes_finish, - this, - stdx::placeholders::_1, - args, - response, - &result)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return cbh.getStatus(); - } - _replExecutor.wait(cbh.getValue()); - if (response->getVoteGranted()) { - LastVote lastVote; - lastVote.setTerm(args.getTerm()); - lastVote.setCandidateId(args.getCandidateId()); - - Status status = _externalState->storeLocalLastVoteDocument(txn, lastVote); - if (!status.isOK()) { - error() << "replSetRequestVotes failed to store LastVote document; " << status; - return status; - } - - } - return result; - } + // Use the index of the minimum quorum in the vector of nodes. + _lastCommittedOpTime = votingNodesOpTimes[(votingNodesOpTimes.size() - 1) / 2]; +} - void ReplicationCoordinatorImpl::_processReplSetRequestVotes_finish( - const ReplicationExecutor::CallbackArgs& cbData, - const ReplSetRequestVotesArgs& args, - ReplSetRequestVotesResponse* response, - Status* result) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - *result = Status(ErrorCodes::ShutdownInProgress, "replication system is shutting down"); - return; - } +OpTime ReplicationCoordinatorImpl::getLastCommittedOpTime() const { + stdx::unique_lock<stdx::mutex> lk(_mutex); + return _lastCommittedOpTime; +} - stdx::unique_lock<stdx::mutex> lk(_mutex); - _topCoord->processReplSetRequestVotes(args, response, getMyLastOptime()); - *result = Status::OK(); +Status ReplicationCoordinatorImpl::processReplSetRequestVotes( + OperationContext* txn, + const ReplSetRequestVotesArgs& args, + ReplSetRequestVotesResponse* response) { + if (!isV1ElectionProtocol()) { + return {ErrorCodes::BadValue, "not using election protocol v1"}; } - Status ReplicationCoordinatorImpl::processReplSetDeclareElectionWinner( - const ReplSetDeclareElectionWinnerArgs& args, - long long* responseTerm) { - if (!isV1ElectionProtocol()) { - return {ErrorCodes::BadValue, "not using election protocol v1"}; - } + updateTerm(args.getTerm()); - updateTerm(args.getTerm()); - - Status result{ErrorCodes::InternalError, - "didn't set status in processReplSetDeclareElectionWinner"}; - CBHStatus cbh = _replExecutor.scheduleWork( - stdx::bind(&ReplicationCoordinatorImpl::_processReplSetDeclareElectionWinner_finish, - this, - stdx::placeholders::_1, - args, - responseTerm, - &result)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return cbh.getStatus(); - } - _replExecutor.wait(cbh.getValue()); - return result; + Status result{ErrorCodes::InternalError, "didn't set status in processReplSetRequestVotes"}; + CBHStatus cbh = _replExecutor.scheduleWork( + stdx::bind(&ReplicationCoordinatorImpl::_processReplSetRequestVotes_finish, + this, + stdx::placeholders::_1, + args, + response, + &result)); + if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { + return cbh.getStatus(); } + _replExecutor.wait(cbh.getValue()); + if (response->getVoteGranted()) { + LastVote lastVote; + lastVote.setTerm(args.getTerm()); + lastVote.setCandidateId(args.getCandidateId()); - void ReplicationCoordinatorImpl::_processReplSetDeclareElectionWinner_finish( - const ReplicationExecutor::CallbackArgs& cbData, - const ReplSetDeclareElectionWinnerArgs& args, - long long* responseTerm, - Status* result) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - *result = Status(ErrorCodes::ShutdownInProgress, "replication system is shutting down"); - return; - } - *result = _topCoord->processReplSetDeclareElectionWinner(args, responseTerm); - } - - void ReplicationCoordinatorImpl::prepareCursorResponseInfo(BSONObjBuilder* objBuilder) { - if (getReplicationMode() == modeReplSet && isV1ElectionProtocol()) { - BSONObjBuilder replObj(objBuilder->subobjStart("repl")); - _topCoord->prepareCursorResponseInfo(objBuilder, getLastCommittedOpTime()); - replObj.done(); + Status status = _externalState->storeLocalLastVoteDocument(txn, lastVote); + if (!status.isOK()) { + error() << "replSetRequestVotes failed to store LastVote document; " << status; + return status; } } - - bool ReplicationCoordinatorImpl::isV1ElectionProtocol() { - return getConfig().getProtocolVersion() == 1; + return result; +} + +void ReplicationCoordinatorImpl::_processReplSetRequestVotes_finish( + const ReplicationExecutor::CallbackArgs& cbData, + const ReplSetRequestVotesArgs& args, + ReplSetRequestVotesResponse* response, + Status* result) { + if (cbData.status == ErrorCodes::CallbackCanceled) { + *result = Status(ErrorCodes::ShutdownInProgress, "replication system is shutting down"); + return; + } + + stdx::unique_lock<stdx::mutex> lk(_mutex); + _topCoord->processReplSetRequestVotes(args, response, getMyLastOptime()); + *result = Status::OK(); +} + +Status ReplicationCoordinatorImpl::processReplSetDeclareElectionWinner( + const ReplSetDeclareElectionWinnerArgs& args, long long* responseTerm) { + if (!isV1ElectionProtocol()) { + return {ErrorCodes::BadValue, "not using election protocol v1"}; + } + + updateTerm(args.getTerm()); + + Status result{ErrorCodes::InternalError, + "didn't set status in processReplSetDeclareElectionWinner"}; + CBHStatus cbh = _replExecutor.scheduleWork( + stdx::bind(&ReplicationCoordinatorImpl::_processReplSetDeclareElectionWinner_finish, + this, + stdx::placeholders::_1, + args, + responseTerm, + &result)); + if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { + return cbh.getStatus(); + } + _replExecutor.wait(cbh.getValue()); + return result; +} + +void ReplicationCoordinatorImpl::_processReplSetDeclareElectionWinner_finish( + const ReplicationExecutor::CallbackArgs& cbData, + const ReplSetDeclareElectionWinnerArgs& args, + long long* responseTerm, + Status* result) { + if (cbData.status == ErrorCodes::CallbackCanceled) { + *result = Status(ErrorCodes::ShutdownInProgress, "replication system is shutting down"); + return; + } + *result = _topCoord->processReplSetDeclareElectionWinner(args, responseTerm); +} + +void ReplicationCoordinatorImpl::prepareCursorResponseInfo(BSONObjBuilder* objBuilder) { + if (getReplicationMode() == modeReplSet && isV1ElectionProtocol()) { + BSONObjBuilder replObj(objBuilder->subobjStart("repl")); + _topCoord->prepareCursorResponseInfo(objBuilder, getLastCommittedOpTime()); + replObj.done(); + } +} + +bool ReplicationCoordinatorImpl::isV1ElectionProtocol() { + return getConfig().getProtocolVersion() == 1; +} + +Status ReplicationCoordinatorImpl::processHeartbeatV1(const ReplSetHeartbeatArgsV1& args, + ReplSetHeartbeatResponse* response) { + { + stdx::lock_guard<stdx::mutex> lock(_mutex); + if (_rsConfigState == kConfigPreStart || _rsConfigState == kConfigStartingUp) { + return Status(ErrorCodes::NotYetInitialized, + "Received heartbeat while still initializing replication system"); + } + } + + Status result(ErrorCodes::InternalError, "didn't set status in prepareHeartbeatResponse"); + CBHStatus cbh = _replExecutor.scheduleWork( + stdx::bind(&ReplicationCoordinatorImpl::_processHeartbeatFinishV1, + this, + stdx::placeholders::_1, + args, + response, + &result)); + if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { + return {ErrorCodes::ShutdownInProgress, "replication shutdown in progress"}; + } + fassert(28645, cbh.getStatus()); + _replExecutor.wait(cbh.getValue()); + return result; +} + +void ReplicationCoordinatorImpl::_processHeartbeatFinishV1( + const ReplicationExecutor::CallbackArgs& cbData, + const ReplSetHeartbeatArgsV1& args, + ReplSetHeartbeatResponse* response, + Status* outStatus) { + if (cbData.status == ErrorCodes::CallbackCanceled) { + *outStatus = {ErrorCodes::ShutdownInProgress, "Replication shutdown in progress"}; + return; + } + fassert(28655, cbData.status); + const Date_t now = _replExecutor.now(); + *outStatus = _topCoord->prepareHeartbeatResponseV1( + now, args, _settings.ourSetName(), getMyLastOptime(), response); + if ((outStatus->isOK() || *outStatus == ErrorCodes::InvalidReplicaSetConfig) && + _selfIndex < 0) { + // If this node does not belong to the configuration it knows about, send heartbeats + // back to any node that sends us a heartbeat, in case one of those remote nodes has + // a configuration that contains us. Chances are excellent that it will, since that + // is the only reason for a remote node to send this node a heartbeat request. + if (!args.getSenderHost().empty() && _seedList.insert(args.getSenderHost()).second) { + _scheduleHeartbeatToTarget(args.getSenderHost(), -1, now); + } + } +} + +void ReplicationCoordinatorImpl::summarizeAsHtml(ReplSetHtmlSummary* output) { + CBHStatus cbh = + _replExecutor.scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_summarizeAsHtml_finish, + this, + stdx::placeholders::_1, + output)); + if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { + return; + } + fassert(28638, cbh.getStatus()); + _replExecutor.wait(cbh.getValue()); +} + +void ReplicationCoordinatorImpl::_summarizeAsHtml_finish(const CallbackArgs& cbData, + ReplSetHtmlSummary* output) { + if (cbData.status == ErrorCodes::CallbackCanceled) { + return; + } + + output->setSelfOptime(getMyLastOptime()); + output->setSelfUptime(time(0) - serverGlobalParams.started); + output->setNow(_replExecutor.now()); + + _topCoord->summarizeAsHtml(output); +} + +long long ReplicationCoordinatorImpl::getTerm() { + long long term = OpTime::kDefaultTerm; + CBHStatus cbh = _replExecutor.scheduleWork(stdx::bind( + &ReplicationCoordinatorImpl::_getTerm_helper, this, stdx::placeholders::_1, &term)); + if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { + return term; } - - Status ReplicationCoordinatorImpl::processHeartbeatV1(const ReplSetHeartbeatArgsV1& args, - ReplSetHeartbeatResponse* response) { - { - stdx::lock_guard<stdx::mutex> lock(_mutex); - if (_rsConfigState == kConfigPreStart || _rsConfigState == kConfigStartingUp) { - return Status(ErrorCodes::NotYetInitialized, - "Received heartbeat while still initializing replication system"); - } - } - - Status result(ErrorCodes::InternalError, "didn't set status in prepareHeartbeatResponse"); - CBHStatus cbh = _replExecutor.scheduleWork( - stdx::bind(&ReplicationCoordinatorImpl::_processHeartbeatFinishV1, - this, - stdx::placeholders::_1, - args, - response, - &result)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return {ErrorCodes::ShutdownInProgress, "replication shutdown in progress"}; - } - fassert(28645, cbh.getStatus()); - _replExecutor.wait(cbh.getValue()); - return result; + fassert(28660, cbh.getStatus()); + _replExecutor.wait(cbh.getValue()); + return term; +} + +void ReplicationCoordinatorImpl::_getTerm_helper(const ReplicationExecutor::CallbackArgs& cbData, + long long* term) { + if (cbData.status == ErrorCodes::CallbackCanceled) { + return; + } + *term = _topCoord->getTerm(); +} + +bool ReplicationCoordinatorImpl::updateTerm(long long term) { + bool updated = false; + CBHStatus cbh = + _replExecutor.scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_updateTerm_helper, + this, + stdx::placeholders::_1, + term, + &updated, + nullptr)); + if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { + return false; } - - void ReplicationCoordinatorImpl::_processHeartbeatFinishV1( - const ReplicationExecutor::CallbackArgs& cbData, - const ReplSetHeartbeatArgsV1& args, - ReplSetHeartbeatResponse* response, - Status* outStatus) { - - if (cbData.status == ErrorCodes::CallbackCanceled) { - *outStatus = {ErrorCodes::ShutdownInProgress, "Replication shutdown in progress"}; - return; - } - fassert(28655, cbData.status); - const Date_t now = _replExecutor.now(); - *outStatus = _topCoord->prepareHeartbeatResponseV1( - now, - args, - _settings.ourSetName(), - getMyLastOptime(), - response); - if ((outStatus->isOK() || *outStatus == ErrorCodes::InvalidReplicaSetConfig) && - _selfIndex < 0) { - // If this node does not belong to the configuration it knows about, send heartbeats - // back to any node that sends us a heartbeat, in case one of those remote nodes has - // a configuration that contains us. Chances are excellent that it will, since that - // is the only reason for a remote node to send this node a heartbeat request. - if (!args.getSenderHost().empty() && _seedList.insert(args.getSenderHost()).second) { - _scheduleHeartbeatToTarget(args.getSenderHost(), -1, now); - } - } + fassert(28670, cbh.getStatus()); + _replExecutor.wait(cbh.getValue()); + return updated; +} + +bool ReplicationCoordinatorImpl::updateTerm_forTest(long long term) { + bool updated = false; + Handle cbHandle; + CBHStatus cbh = + _replExecutor.scheduleWork(stdx::bind(&ReplicationCoordinatorImpl::_updateTerm_helper, + this, + stdx::placeholders::_1, + term, + &updated, + &cbHandle)); + if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { + return false; } + fassert(28673, cbh.getStatus()); + _replExecutor.wait(cbh.getValue()); + _replExecutor.wait(cbHandle); + return updated; +} - void ReplicationCoordinatorImpl::summarizeAsHtml(ReplSetHtmlSummary* output) { - CBHStatus cbh = _replExecutor.scheduleWork( - stdx::bind(&ReplicationCoordinatorImpl::_summarizeAsHtml_finish, - this, - stdx::placeholders::_1, - output)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return; - } - fassert(28638, cbh.getStatus()); - _replExecutor.wait(cbh.getValue()); +void ReplicationCoordinatorImpl::_updateTerm_helper(const ReplicationExecutor::CallbackArgs& cbData, + long long term, + bool* updated, + Handle* cbHandle) { + if (cbData.status == ErrorCodes::CallbackCanceled) { + return; } - void ReplicationCoordinatorImpl::_summarizeAsHtml_finish(const CallbackArgs& cbData, - ReplSetHtmlSummary* output) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - return; - } - - output->setSelfOptime(getMyLastOptime()); - output->setSelfUptime(time(0) - serverGlobalParams.started); - output->setNow(_replExecutor.now()); + *updated = _updateTerm_incallback(term, cbHandle); +} - _topCoord->summarizeAsHtml(output); - } - - long long ReplicationCoordinatorImpl::getTerm() { - long long term = OpTime::kDefaultTerm; - CBHStatus cbh = _replExecutor.scheduleWork( - stdx::bind(&ReplicationCoordinatorImpl::_getTerm_helper, - this, - stdx::placeholders::_1, - &term)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return term; - } - fassert(28660, cbh.getStatus()); - _replExecutor.wait(cbh.getValue()); - return term; - } +bool ReplicationCoordinatorImpl::_updateTerm_incallback(long long term, Handle* cbHandle) { + bool updated = _topCoord->updateTerm(term); - void ReplicationCoordinatorImpl::_getTerm_helper( - const ReplicationExecutor::CallbackArgs& cbData, - long long* term) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - return; - } - *term = _topCoord->getTerm(); - } - - bool ReplicationCoordinatorImpl::updateTerm(long long term) { - bool updated = false; - CBHStatus cbh = _replExecutor.scheduleWork( - stdx::bind(&ReplicationCoordinatorImpl::_updateTerm_helper, - this, - stdx::placeholders::_1, - term, - &updated, - nullptr)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return false; - } - fassert(28670, cbh.getStatus()); - _replExecutor.wait(cbh.getValue()); - return updated; - } - - bool ReplicationCoordinatorImpl::updateTerm_forTest(long long term) { - bool updated = false; - Handle cbHandle; - CBHStatus cbh = _replExecutor.scheduleWork( - stdx::bind(&ReplicationCoordinatorImpl::_updateTerm_helper, - this, - stdx::placeholders::_1, - term, - &updated, - &cbHandle)); + if (updated && getMemberState().primary()) { + log() << "stepping down from primary, because a new term has begun"; + _topCoord->prepareForStepDown(); + CBHStatus cbh = _replExecutor.scheduleWorkWithGlobalExclusiveLock( + stdx::bind(&ReplicationCoordinatorImpl::_stepDownFinish, this, stdx::placeholders::_1)); if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return false; - } - fassert(28673, cbh.getStatus()); - _replExecutor.wait(cbh.getValue()); - _replExecutor.wait(cbHandle); - return updated; - } - - void ReplicationCoordinatorImpl::_updateTerm_helper( - const ReplicationExecutor::CallbackArgs& cbData, - long long term, - bool* updated, - Handle* cbHandle) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - return; + return true; } - - *updated = _updateTerm_incallback(term, cbHandle); - } - - bool ReplicationCoordinatorImpl::_updateTerm_incallback(long long term, Handle* cbHandle) { - bool updated = _topCoord->updateTerm(term); - - if (updated && getMemberState().primary()) { - log() << "stepping down from primary, because a new term has begun"; - _topCoord->prepareForStepDown(); - CBHStatus cbh = _replExecutor.scheduleWorkWithGlobalExclusiveLock( - stdx::bind(&ReplicationCoordinatorImpl::_stepDownFinish, - this, - stdx::placeholders::_1)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return true; - } - fassert(28672, cbh.getStatus()); - if (cbHandle) { - *cbHandle = cbh.getValue(); - } + fassert(28672, cbh.getStatus()); + if (cbHandle) { + *cbHandle = cbh.getValue(); } - return updated; } + return updated; +} -} // namespace repl -} // namespace mongo +} // namespace repl +} // namespace mongo |