summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSpencer T Brody <spencer@mongodb.com>2014-08-25 18:17:22 -0400
committerSpencer T Brody <spencer@mongodb.com>2014-09-04 21:52:14 -0400
commitf258721ea54fcd9746f7070ca1f467a8f92b8b83 (patch)
tree7108a33c74cb2d8e874cb16262ff36281fcea53c
parent521a6fffc689322ed2e5a0356d39da4301c93259 (diff)
downloadmongo-f258721ea54fcd9746f7070ca1f467a8f92b8b83.tar.gz
SERVER-15031 Switch slave tracking and write concern satisfaction over to the new ReplicationCoordinator
-rw-r--r--src/mongo/db/repl/heartbeat.cpp4
-rw-r--r--src/mongo/db/repl/repl_coordinator_hybrid.cpp75
-rw-r--r--src/mongo/db/repl/repl_coordinator_impl.cpp23
-rw-r--r--src/mongo/db/repl/repl_coordinator_legacy.cpp10
-rw-r--r--src/mongo/db/repl/repl_set_impl.cpp14
5 files changed, 88 insertions, 38 deletions
diff --git a/src/mongo/db/repl/heartbeat.cpp b/src/mongo/db/repl/heartbeat.cpp
index 64dc14ff256..54e05e82233 100644
--- a/src/mongo/db/repl/heartbeat.cpp
+++ b/src/mongo/db/repl/heartbeat.cpp
@@ -221,8 +221,8 @@ namespace {
boost::thread t(startSyncThread);
boost::thread producer(stdx::bind(&BackgroundSync::producerThread, sync));
- boost::thread feedback(stdx::bind(&SyncSourceFeedback::run,
- &theReplSet->syncSourceFeedback));
+ //boost::thread feedback(stdx::bind(&SyncSourceFeedback::run,
+ // &theReplSet->syncSourceFeedback));
// member heartbeats are started in ReplSetImpl::initFromConfig
}
diff --git a/src/mongo/db/repl/repl_coordinator_hybrid.cpp b/src/mongo/db/repl/repl_coordinator_hybrid.cpp
index 89d39dbea4d..0d5bbcdfb3c 100644
--- a/src/mongo/db/repl/repl_coordinator_hybrid.cpp
+++ b/src/mongo/db/repl/repl_coordinator_hybrid.cpp
@@ -93,16 +93,29 @@ namespace repl {
const OperationContext* txn,
const OpTime& ts,
const WriteConcernOptions& writeConcern) {
- StatusAndDuration legacyStatus = _legacy.awaitReplication(txn, ts, writeConcern);
- return legacyStatus;
+ StatusAndDuration implStatus = _impl.awaitReplication(txn, ts, writeConcern);
+ if (implStatus.status.isOK()) {
+ WriteConcernOptions legacyWriteConcern = writeConcern;
+ legacyWriteConcern.wTimeout = WriteConcernOptions::kNoWaiting;
+ StatusAndDuration legacyStatus = _legacy.awaitReplication(txn, ts, legacyWriteConcern);
+ fassert(18691, legacyStatus.status);
+ }
+ return implStatus;
}
ReplicationCoordinator::StatusAndDuration
HybridReplicationCoordinator::awaitReplicationOfLastOp(
const OperationContext* txn,
const WriteConcernOptions& writeConcern) {
- StatusAndDuration legacyStatus = _legacy.awaitReplicationOfLastOp(txn, writeConcern);
- return legacyStatus;
+ StatusAndDuration implStatus = _impl.awaitReplicationOfLastOp(txn, writeConcern);
+ if (implStatus.status.isOK()) {
+ WriteConcernOptions legacyWriteConcern = writeConcern;
+ legacyWriteConcern.wTimeout = WriteConcernOptions::kNoWaiting;
+ StatusAndDuration legacyStatus = _legacy.awaitReplicationOfLastOp(txn,
+ legacyWriteConcern);
+ fassert(18669, legacyStatus.status);
+ }
+ return implStatus;
}
Status HybridReplicationCoordinator::stepDown(OperationContext* txn,
@@ -167,12 +180,22 @@ namespace repl {
const OpTime& ts) {
Status legacyStatus = _legacy.setLastOptime(txn, rid, ts);
Status implStatus = _impl.setLastOptime(txn, rid, ts);
- return legacyStatus;
+ if (legacyStatus.code() != implStatus.code()) {
+ warning() << "Hybrid response difference in setLastOptime. Legacy response: "
+ << legacyStatus << ", impl response: " << implStatus;
+ }
+ fassert(18667, legacyStatus.code() == implStatus.code());
+ return implStatus;
}
Status HybridReplicationCoordinator::setMyLastOptime(OperationContext* txn, const OpTime& ts) {
Status legacyStatus = _legacy.setMyLastOptime(txn, ts);
Status implStatus = _impl.setMyLastOptime(txn, ts);
+ if (legacyStatus.code() != implStatus.code()) {
+ warning() << "Hybrid response difference in setMyLastOptime. Legacy response: "
+ << legacyStatus << ", impl response: " << implStatus;
+ }
+ fassert(18666, legacyStatus.code() == implStatus.code());
return legacyStatus;
}
@@ -190,17 +213,17 @@ namespace repl {
void HybridReplicationCoordinator::prepareReplSetUpdatePositionCommand(OperationContext* txn,
BSONObjBuilder* result) {
- _legacy.prepareReplSetUpdatePositionCommand(txn, result);
- BSONObjBuilder implResult;
- _impl.prepareReplSetUpdatePositionCommand(txn, &implResult);
+ _impl.prepareReplSetUpdatePositionCommand(txn, result);
+ BSONObjBuilder legacyResult;
+ _legacy.prepareReplSetUpdatePositionCommand(txn, &legacyResult);
}
void HybridReplicationCoordinator::prepareReplSetUpdatePositionCommandHandshakes(
OperationContext* txn,
std::vector<BSONObj>* handshakes) {
- _legacy.prepareReplSetUpdatePositionCommandHandshakes(txn, handshakes);
- std::vector<BSONObj> implResult;
- _impl.prepareReplSetUpdatePositionCommandHandshakes(txn, &implResult);
+ _impl.prepareReplSetUpdatePositionCommandHandshakes(txn, handshakes);
+ std::vector<BSONObj> legacyResult;
+ _legacy.prepareReplSetUpdatePositionCommandHandshakes(txn, &legacyResult);
}
Status HybridReplicationCoordinator::processReplSetGetStatus(BSONObjBuilder* result) {
@@ -306,15 +329,37 @@ namespace repl {
OperationContext* txn,
const UpdatePositionArgs& updates) {
Status legacyStatus = _legacy.processReplSetUpdatePosition(txn, updates);
- _impl.processReplSetUpdatePosition(txn, updates);
- return legacyStatus;
+ Status implStatus = _impl.processReplSetUpdatePosition(txn, updates);
+ if (legacyStatus.code() != implStatus.code()) {
+ warning() << "Hybrid response difference in processReplSetUpdatePosition. "
+ "Legacy response: " << legacyStatus << ", impl response: " << implStatus;
+ // Only valid way they can be different is legacy not finding the node and impl
+ // succeeding. This is valid b/c legacy clears it's _members array on reconfigs
+ // and then rebuilds it in a non-atomic way.
+ fassert(18690, legacyStatus == ErrorCodes::NodeNotFound && implStatus.isOK());
+ }
+ return implStatus;
}
Status HybridReplicationCoordinator::processHandshake(const OperationContext* txn,
const HandshakeArgs& handshake) {
Status legacyResponse = _legacy.processHandshake(txn, handshake);
- _impl.processHandshake(txn, handshake);
- return legacyResponse;
+ Status implResponse = _impl.processHandshake(txn, handshake);
+ if (legacyResponse.code() != implResponse.code()) {
+ warning() << "Hybrid response difference in processHandshake. Legacy response: "
+ << legacyResponse << ", impl response: " << implResponse;
+ // Can't fassert that the codes match because when doing a replSetReconfig that adds or
+ // removes nodes there is always a race condition between the two coordinators switching
+ // to the new config.
+ if (implResponse.isOK()) {
+ // If either coordinator has a problem have to return whichever has the non-OK
+ // status so that the handshake will be retried by the sender, otherwise whichever
+ // coordinator failed to process the handshake will fail later on when processing
+ // replSetUpdatePosition
+ return legacyResponse;
+ }
+ }
+ return implResponse;
}
void HybridReplicationCoordinator::waitUpToOneSecondForOptimeChange(const OpTime& ot) {
diff --git a/src/mongo/db/repl/repl_coordinator_impl.cpp b/src/mongo/db/repl/repl_coordinator_impl.cpp
index bb60ca20be6..a1b48589da1 100644
--- a/src/mongo/db/repl/repl_coordinator_impl.cpp
+++ b/src/mongo/db/repl/repl_coordinator_impl.cpp
@@ -225,11 +225,9 @@ namespace {
_topCoordDriverThread.reset(new boost::thread(stdx::bind(&ReplicationExecutor::run,
&_replExecutor)));
- // TODO(spencer): Start this thread once we're no longer starting a SyncSourceFeedback
- // thread in the Legacy coordinator
- //_syncSourceFeedbackThread.reset(new boost::thread(
- // stdx::bind(&ReplicationCoordinatorExternalState::runSyncSourceFeedback,
- // _externalState.get())));
+ _syncSourceFeedbackThread.reset(new boost::thread(
+ stdx::bind(&ReplicationCoordinatorExternalState::runSyncSourceFeedback,
+ _externalState.get())));
bool doneLoadingConfig = _startLoadLocalConfig(txn);
if (doneLoadingConfig) {
@@ -265,7 +263,7 @@ namespace {
_replExecutor.shutdown();
_topCoordDriverThread->join(); // must happen outside _mutex
_externalState->shutdown();
- // _syncSourceFeedbackThread->join(); // TODO(spencer): put back once the thread is started
+ _syncSourceFeedbackThread->join();
}
ReplSettings& ReplicationCoordinatorImpl::getSettings() {
@@ -442,10 +440,9 @@ namespace {
const OperationContext* txn,
const OpTime& opTime,
const WriteConcernOptions& writeConcern) {
- // TODO(spencer): handle killop
-
-
if (writeConcern.wNumNodes <= 1 && writeConcern.wMode.empty()) {
+ // TODO(spencer): is this right? The map does contain entries for ourself, so it seems
+ // like checking the map even for w:1 writes makes some sense...
// no desired replication check
return StatusAndDuration(Status::OK(), Milliseconds(0));
}
@@ -511,8 +508,7 @@ namespace {
ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::awaitReplicationOfLastOp(
const OperationContext* txn,
const WriteConcernOptions& writeConcern) {
- // TODO
- return StatusAndDuration(Status::OK(), Milliseconds(0));
+ return awaitReplication(txn, _getLastOpApplied(), writeConcern);
}
Status ReplicationCoordinatorImpl::stepDown(OperationContext* txn,
@@ -1059,7 +1055,10 @@ namespace {
if (!member) {
return Status(ErrorCodes::NodeNotFound,
str::stream() << "Node with replica set member ID " << memberID <<
- " could not be found in replica set config during handshake");
+ " could not be found in replica set config while attempting"
+ " to associate it with RID " << handshake.getRid() <<
+ " in replication handshake. ReplSet Config: " <<
+ _rsConfig.toBSON().toString());
}
SlaveInfo& slaveInfo = _slaveInfoMap[handshake.getRid()];
slaveInfo.memberID = memberID;
diff --git a/src/mongo/db/repl/repl_coordinator_legacy.cpp b/src/mongo/db/repl/repl_coordinator_legacy.cpp
index eab110ed6b2..055286d9757 100644
--- a/src/mongo/db/repl/repl_coordinator_legacy.cpp
+++ b/src/mongo/db/repl/repl_coordinator_legacy.cpp
@@ -414,8 +414,9 @@ namespace {
if (!updateSlaveTracking(BSON("_id" << rid), config, ts)) {
return Status(ErrorCodes::NodeNotFound,
str::stream() << "could not update node with _id: "
- << config["_id"].Int()
- << " because it cannot be found in current ReplSetConfig");
+ << config["_id"].Int() << " and RID " << rid
+ << " because it cannot be found in current ReplSetConfig "
+ << theReplSet->getConfig().toString());
}
}
@@ -936,7 +937,10 @@ namespace {
if (!member) {
return Status(ErrorCodes::NodeNotFound,
str::stream() << "Node with replica set member ID " << memberID <<
- " could not be found in replica set config during handshake");
+ " could not be found in replica set config while attempting to "
+ "associate it with RID " << handshake.getRid() <<
+ " in replication handshake. ReplSet config: " <<
+ theReplSet->getConfig().toString());
}
_ridMemberMap[handshake.getRid()] = member;
diff --git a/src/mongo/db/repl/repl_set_impl.cpp b/src/mongo/db/repl/repl_set_impl.cpp
index 8bd96bac732..b5a25356041 100644
--- a/src/mongo/db/repl/repl_set_impl.cpp
+++ b/src/mongo/db/repl/repl_set_impl.cpp
@@ -52,6 +52,7 @@
#include "mongo/util/background.h"
#include "mongo/util/exit.h"
#include "mongo/util/log.h"
+#include "mongo/util/scopeguard.h"
namespace mongo {
@@ -599,6 +600,13 @@ namespace {
_cfg = new ReplSetConfig(c);
+ // config() is same thing but const, so we use that when we can for clarity below
+ dassert(&config() == _cfg);
+ verify(config().ok());
+ verify(_name.empty() || _name == config()._id);
+ _name = config()._id;
+ verify(!_name.empty());
+
{
// Hack to force ReplicationCoordinatorImpl to have a config.
// TODO(spencer): rm this once the ReplicationCoordinatorImpl can load its own config.
@@ -608,12 +616,6 @@ namespace {
replCoord->setImplConfigHack(_cfg);
}
- // config() is same thing but const, so we use that when we can for clarity below
- dassert(&config() == _cfg);
- verify(config().ok());
- verify(_name.empty() || _name == config()._id);
- _name = config()._id;
- verify(!_name.empty());
// this is a shortcut for simple changes
if (additive) {
log() << "replSet info : additive change to configuration" << rsLog;