diff options
author | Spencer T Brody <spencer@mongodb.com> | 2014-08-25 18:17:22 -0400 |
---|---|---|
committer | Spencer T Brody <spencer@mongodb.com> | 2014-09-04 21:52:14 -0400 |
commit | f258721ea54fcd9746f7070ca1f467a8f92b8b83 (patch) | |
tree | 7108a33c74cb2d8e874cb16262ff36281fcea53c | |
parent | 521a6fffc689322ed2e5a0356d39da4301c93259 (diff) | |
download | mongo-f258721ea54fcd9746f7070ca1f467a8f92b8b83.tar.gz |
SERVER-15031 Switch slave tracking and write concern satisfaction over to the new ReplicationCoordinator
-rw-r--r-- | src/mongo/db/repl/heartbeat.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_hybrid.cpp | 75 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_impl.cpp | 23 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_legacy.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_set_impl.cpp | 14 |
5 files changed, 88 insertions, 38 deletions
diff --git a/src/mongo/db/repl/heartbeat.cpp b/src/mongo/db/repl/heartbeat.cpp index 64dc14ff256..54e05e82233 100644 --- a/src/mongo/db/repl/heartbeat.cpp +++ b/src/mongo/db/repl/heartbeat.cpp @@ -221,8 +221,8 @@ namespace { boost::thread t(startSyncThread); boost::thread producer(stdx::bind(&BackgroundSync::producerThread, sync)); - boost::thread feedback(stdx::bind(&SyncSourceFeedback::run, - &theReplSet->syncSourceFeedback)); + //boost::thread feedback(stdx::bind(&SyncSourceFeedback::run, + // &theReplSet->syncSourceFeedback)); // member heartbeats are started in ReplSetImpl::initFromConfig } diff --git a/src/mongo/db/repl/repl_coordinator_hybrid.cpp b/src/mongo/db/repl/repl_coordinator_hybrid.cpp index 89d39dbea4d..0d5bbcdfb3c 100644 --- a/src/mongo/db/repl/repl_coordinator_hybrid.cpp +++ b/src/mongo/db/repl/repl_coordinator_hybrid.cpp @@ -93,16 +93,29 @@ namespace repl { const OperationContext* txn, const OpTime& ts, const WriteConcernOptions& writeConcern) { - StatusAndDuration legacyStatus = _legacy.awaitReplication(txn, ts, writeConcern); - return legacyStatus; + StatusAndDuration implStatus = _impl.awaitReplication(txn, ts, writeConcern); + if (implStatus.status.isOK()) { + WriteConcernOptions legacyWriteConcern = writeConcern; + legacyWriteConcern.wTimeout = WriteConcernOptions::kNoWaiting; + StatusAndDuration legacyStatus = _legacy.awaitReplication(txn, ts, legacyWriteConcern); + fassert(18691, legacyStatus.status); + } + return implStatus; } ReplicationCoordinator::StatusAndDuration HybridReplicationCoordinator::awaitReplicationOfLastOp( const OperationContext* txn, const WriteConcernOptions& writeConcern) { - StatusAndDuration legacyStatus = _legacy.awaitReplicationOfLastOp(txn, writeConcern); - return legacyStatus; + StatusAndDuration implStatus = _impl.awaitReplicationOfLastOp(txn, writeConcern); + if (implStatus.status.isOK()) { + WriteConcernOptions legacyWriteConcern = writeConcern; + legacyWriteConcern.wTimeout = WriteConcernOptions::kNoWaiting; + StatusAndDuration legacyStatus = _legacy.awaitReplicationOfLastOp(txn, + legacyWriteConcern); + fassert(18669, legacyStatus.status); + } + return implStatus; } Status HybridReplicationCoordinator::stepDown(OperationContext* txn, @@ -167,12 +180,22 @@ namespace repl { const OpTime& ts) { Status legacyStatus = _legacy.setLastOptime(txn, rid, ts); Status implStatus = _impl.setLastOptime(txn, rid, ts); - return legacyStatus; + if (legacyStatus.code() != implStatus.code()) { + warning() << "Hybrid response difference in setLastOptime. Legacy response: " + << legacyStatus << ", impl response: " << implStatus; + } + fassert(18667, legacyStatus.code() == implStatus.code()); + return implStatus; } Status HybridReplicationCoordinator::setMyLastOptime(OperationContext* txn, const OpTime& ts) { Status legacyStatus = _legacy.setMyLastOptime(txn, ts); Status implStatus = _impl.setMyLastOptime(txn, ts); + if (legacyStatus.code() != implStatus.code()) { + warning() << "Hybrid response difference in setMyLastOptime. Legacy response: " + << legacyStatus << ", impl response: " << implStatus; + } + fassert(18666, legacyStatus.code() == implStatus.code()); return legacyStatus; } @@ -190,17 +213,17 @@ namespace repl { void HybridReplicationCoordinator::prepareReplSetUpdatePositionCommand(OperationContext* txn, BSONObjBuilder* result) { - _legacy.prepareReplSetUpdatePositionCommand(txn, result); - BSONObjBuilder implResult; - _impl.prepareReplSetUpdatePositionCommand(txn, &implResult); + _impl.prepareReplSetUpdatePositionCommand(txn, result); + BSONObjBuilder legacyResult; + _legacy.prepareReplSetUpdatePositionCommand(txn, &legacyResult); } void HybridReplicationCoordinator::prepareReplSetUpdatePositionCommandHandshakes( OperationContext* txn, std::vector<BSONObj>* handshakes) { - _legacy.prepareReplSetUpdatePositionCommandHandshakes(txn, handshakes); - std::vector<BSONObj> implResult; - _impl.prepareReplSetUpdatePositionCommandHandshakes(txn, &implResult); + _impl.prepareReplSetUpdatePositionCommandHandshakes(txn, handshakes); + std::vector<BSONObj> legacyResult; + _legacy.prepareReplSetUpdatePositionCommandHandshakes(txn, &legacyResult); } Status HybridReplicationCoordinator::processReplSetGetStatus(BSONObjBuilder* result) { @@ -306,15 +329,37 @@ namespace repl { OperationContext* txn, const UpdatePositionArgs& updates) { Status legacyStatus = _legacy.processReplSetUpdatePosition(txn, updates); - _impl.processReplSetUpdatePosition(txn, updates); - return legacyStatus; + Status implStatus = _impl.processReplSetUpdatePosition(txn, updates); + if (legacyStatus.code() != implStatus.code()) { + warning() << "Hybrid response difference in processReplSetUpdatePosition. " + "Legacy response: " << legacyStatus << ", impl response: " << implStatus; + // Only valid way they can be different is legacy not finding the node and impl + // succeeding. This is valid b/c legacy clears it's _members array on reconfigs + // and then rebuilds it in a non-atomic way. + fassert(18690, legacyStatus == ErrorCodes::NodeNotFound && implStatus.isOK()); + } + return implStatus; } Status HybridReplicationCoordinator::processHandshake(const OperationContext* txn, const HandshakeArgs& handshake) { Status legacyResponse = _legacy.processHandshake(txn, handshake); - _impl.processHandshake(txn, handshake); - return legacyResponse; + Status implResponse = _impl.processHandshake(txn, handshake); + if (legacyResponse.code() != implResponse.code()) { + warning() << "Hybrid response difference in processHandshake. Legacy response: " + << legacyResponse << ", impl response: " << implResponse; + // Can't fassert that the codes match because when doing a replSetReconfig that adds or + // removes nodes there is always a race condition between the two coordinators switching + // to the new config. + if (implResponse.isOK()) { + // If either coordinator has a problem have to return whichever has the non-OK + // status so that the handshake will be retried by the sender, otherwise whichever + // coordinator failed to process the handshake will fail later on when processing + // replSetUpdatePosition + return legacyResponse; + } + } + return implResponse; } void HybridReplicationCoordinator::waitUpToOneSecondForOptimeChange(const OpTime& ot) { diff --git a/src/mongo/db/repl/repl_coordinator_impl.cpp b/src/mongo/db/repl/repl_coordinator_impl.cpp index bb60ca20be6..a1b48589da1 100644 --- a/src/mongo/db/repl/repl_coordinator_impl.cpp +++ b/src/mongo/db/repl/repl_coordinator_impl.cpp @@ -225,11 +225,9 @@ namespace { _topCoordDriverThread.reset(new boost::thread(stdx::bind(&ReplicationExecutor::run, &_replExecutor))); - // TODO(spencer): Start this thread once we're no longer starting a SyncSourceFeedback - // thread in the Legacy coordinator - //_syncSourceFeedbackThread.reset(new boost::thread( - // stdx::bind(&ReplicationCoordinatorExternalState::runSyncSourceFeedback, - // _externalState.get()))); + _syncSourceFeedbackThread.reset(new boost::thread( + stdx::bind(&ReplicationCoordinatorExternalState::runSyncSourceFeedback, + _externalState.get()))); bool doneLoadingConfig = _startLoadLocalConfig(txn); if (doneLoadingConfig) { @@ -265,7 +263,7 @@ namespace { _replExecutor.shutdown(); _topCoordDriverThread->join(); // must happen outside _mutex _externalState->shutdown(); - // _syncSourceFeedbackThread->join(); // TODO(spencer): put back once the thread is started + _syncSourceFeedbackThread->join(); } ReplSettings& ReplicationCoordinatorImpl::getSettings() { @@ -442,10 +440,9 @@ namespace { const OperationContext* txn, const OpTime& opTime, const WriteConcernOptions& writeConcern) { - // TODO(spencer): handle killop - - if (writeConcern.wNumNodes <= 1 && writeConcern.wMode.empty()) { + // TODO(spencer): is this right? The map does contain entries for ourself, so it seems + // like checking the map even for w:1 writes makes some sense... // no desired replication check return StatusAndDuration(Status::OK(), Milliseconds(0)); } @@ -511,8 +508,7 @@ namespace { ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::awaitReplicationOfLastOp( const OperationContext* txn, const WriteConcernOptions& writeConcern) { - // TODO - return StatusAndDuration(Status::OK(), Milliseconds(0)); + return awaitReplication(txn, _getLastOpApplied(), writeConcern); } Status ReplicationCoordinatorImpl::stepDown(OperationContext* txn, @@ -1059,7 +1055,10 @@ namespace { if (!member) { return Status(ErrorCodes::NodeNotFound, str::stream() << "Node with replica set member ID " << memberID << - " could not be found in replica set config during handshake"); + " could not be found in replica set config while attempting" + " to associate it with RID " << handshake.getRid() << + " in replication handshake. ReplSet Config: " << + _rsConfig.toBSON().toString()); } SlaveInfo& slaveInfo = _slaveInfoMap[handshake.getRid()]; slaveInfo.memberID = memberID; diff --git a/src/mongo/db/repl/repl_coordinator_legacy.cpp b/src/mongo/db/repl/repl_coordinator_legacy.cpp index eab110ed6b2..055286d9757 100644 --- a/src/mongo/db/repl/repl_coordinator_legacy.cpp +++ b/src/mongo/db/repl/repl_coordinator_legacy.cpp @@ -414,8 +414,9 @@ namespace { if (!updateSlaveTracking(BSON("_id" << rid), config, ts)) { return Status(ErrorCodes::NodeNotFound, str::stream() << "could not update node with _id: " - << config["_id"].Int() - << " because it cannot be found in current ReplSetConfig"); + << config["_id"].Int() << " and RID " << rid + << " because it cannot be found in current ReplSetConfig " + << theReplSet->getConfig().toString()); } } @@ -936,7 +937,10 @@ namespace { if (!member) { return Status(ErrorCodes::NodeNotFound, str::stream() << "Node with replica set member ID " << memberID << - " could not be found in replica set config during handshake"); + " could not be found in replica set config while attempting to " + "associate it with RID " << handshake.getRid() << + " in replication handshake. ReplSet config: " << + theReplSet->getConfig().toString()); } _ridMemberMap[handshake.getRid()] = member; diff --git a/src/mongo/db/repl/repl_set_impl.cpp b/src/mongo/db/repl/repl_set_impl.cpp index 8bd96bac732..b5a25356041 100644 --- a/src/mongo/db/repl/repl_set_impl.cpp +++ b/src/mongo/db/repl/repl_set_impl.cpp @@ -52,6 +52,7 @@ #include "mongo/util/background.h" #include "mongo/util/exit.h" #include "mongo/util/log.h" +#include "mongo/util/scopeguard.h" namespace mongo { @@ -599,6 +600,13 @@ namespace { _cfg = new ReplSetConfig(c); + // config() is same thing but const, so we use that when we can for clarity below + dassert(&config() == _cfg); + verify(config().ok()); + verify(_name.empty() || _name == config()._id); + _name = config()._id; + verify(!_name.empty()); + { // Hack to force ReplicationCoordinatorImpl to have a config. // TODO(spencer): rm this once the ReplicationCoordinatorImpl can load its own config. @@ -608,12 +616,6 @@ namespace { replCoord->setImplConfigHack(_cfg); } - // config() is same thing but const, so we use that when we can for clarity below - dassert(&config() == _cfg); - verify(config().ok()); - verify(_name.empty() || _name == config()._id); - _name = config()._id; - verify(!_name.empty()); // this is a shortcut for simple changes if (additive) { log() << "replSet info : additive change to configuration" << rsLog; |