summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSpencer T Brody <spencer@mongodb.com>2014-07-16 14:30:35 -0400
committerSpencer T Brody <spencer@mongodb.com>2014-07-18 14:47:11 -0400
commit7c9e21b95804fde5a4837c5aab47ab9c2e57a267 (patch)
treea5e712c0d5bc29f6f32f2f80e3f1833f115190b8 /src
parentf6cc85d06aaa39cea2baaf8771932bb8c5672cf7 (diff)
downloadmongo-7c9e21b95804fde5a4837c5aab47ab9c2e57a267.tar.gz
SERVER-14442 Make ReplicationCoordinator responsible for constructing ReplSetUpdatePosition commands
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/bgsync.cpp4
-rw-r--r--src/mongo/db/repl/master_slave.cpp35
-rw-r--r--src/mongo/db/repl/master_slave.h9
-rw-r--r--src/mongo/db/repl/repl_coordinator.h12
-rw-r--r--src/mongo/db/repl/repl_coordinator_hybrid.cpp12
-rw-r--r--src/mongo/db/repl/repl_coordinator_hybrid.h4
-rw-r--r--src/mongo/db/repl/repl_coordinator_impl.cpp11
-rw-r--r--src/mongo/db/repl/repl_coordinator_impl.h4
-rw-r--r--src/mongo/db/repl/repl_coordinator_legacy.cpp91
-rw-r--r--src/mongo/db/repl/repl_coordinator_legacy.h13
-rw-r--r--src/mongo/db/repl/repl_coordinator_mock.cpp7
-rw-r--r--src/mongo/db/repl/repl_coordinator_mock.h4
-rw-r--r--src/mongo/db/repl/sync_source_feedback.cpp35
-rw-r--r--src/mongo/db/repl/sync_source_feedback.h17
-rw-r--r--src/mongo/db/repl/write_concern.cpp1
15 files changed, 186 insertions, 73 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index c5fd835c650..beb4904428b 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -33,6 +33,7 @@
#include "mongo/db/commands/server_status_metric.h"
#include "mongo/db/repl/bgsync.h"
#include "mongo/db/repl/oplog.h"
+#include "mongo/db/repl/repl_coordinator_global.h"
#include "mongo/db/repl/rs_sync.h"
#include "mongo/db/repl/rs.h"
#include "mongo/util/fail_point_service.h"
@@ -111,7 +112,8 @@ namespace repl {
}
void BackgroundSync::notify() {
- theReplSet->syncSourceFeedback.updateSelfInMap(theReplSet->lastOpTimeWritten);
+ ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
+ replCoord->setLastOptime(replCoord->getMyRID(), theReplSet->lastOpTimeWritten);
{
boost::unique_lock<boost::mutex> lock(s_instance->_mutex);
diff --git a/src/mongo/db/repl/master_slave.cpp b/src/mongo/db/repl/master_slave.cpp
index 02ee6c11e41..ca6e2cc2e1d 100644
--- a/src/mongo/db/repl/master_slave.cpp
+++ b/src/mongo/db/repl/master_slave.cpp
@@ -168,24 +168,25 @@ namespace repl {
void ReplSource::ensureMe() {
string myname = getHostName();
+ bool exists = false;
{
OperationContextImpl txn;
- Client::WriteContext ctx(&txn, "local");
+ Client::ReadContext ctx(&txn, "local");
// local.me is an identifier for a server for getLastError w:2+
- if (!Helpers::getSingleton(&txn, "local.me", _me) ||
- !_me.hasField("host") ||
- _me["host"].String() != myname) {
-
- // clean out local.me
- Helpers::emptyCollection(&txn, "local.me");
-
- // repopulate
- BSONObjBuilder b;
- b.appendOID("_id", 0, true);
- b.append("host", myname);
- _me = b.obj();
- Helpers::putSingleton(&txn, "local.me", _me);
- }
+ exists = Helpers::getSingleton(&txn, "local.me", _me);
+ }
+ if (!exists || !_me.hasField("host") || _me["host"].String() != myname) {
+ OperationContextImpl txn;
+ Client::WriteContext ctx(&txn, "local");
+ // clean out local.me
+ Helpers::emptyCollection(&txn, "local.me");
+
+ // repopulate
+ BSONObjBuilder b;
+ b.appendOID("_id", 0, true);
+ b.append("host", myname);
+ _me = b.obj();
+ Helpers::putSingleton(&txn, "local.me", _me);
ctx.commit();
}
}
@@ -1256,6 +1257,10 @@ namespace repl {
replLocalAuth();
}
+ {
+ ReplSource temp; // Ensures local.me is populated
+ }
+
if ( replSettings.slave ) {
verify( replSettings.slave == SimpleSlave );
LOG(1) << "slave=true" << endl;
diff --git a/src/mongo/db/repl/master_slave.h b/src/mongo/db/repl/master_slave.h
index 65e9384732c..15fd74b2176 100644
--- a/src/mongo/db/repl/master_slave.h
+++ b/src/mongo/db/repl/master_slave.h
@@ -101,8 +101,6 @@ namespace repl {
BSONObj _me;
- ReplSource();
-
void resyncDrop( OperationContext* txn, const std::string& db );
// call without the db mutex
void syncToTailOfRemoteLog();
@@ -128,6 +126,9 @@ namespace repl {
public:
OplogReader oplogReader;
+ // Returns the RID for this process. ensureMe() must have been called before this can be.
+ OID getMyRID() const { return _me["_id"].OID(); }
+
void applyOperation(OperationContext* txn, Database* db, const BSONObj& op);
std::string hostName; // ip addr or hostname plus optionally, ":<port>"
std::string _sourceName; // a logical source name.
@@ -142,6 +143,10 @@ namespace repl {
typedef std::vector< shared_ptr< ReplSource > > SourceVector;
static void loadAll(OperationContext* txn, SourceVector&);
explicit ReplSource(BSONObj);
+ // This is not the constructor you are looking for. Always prefer the version that takes
+ // a BSONObj. This is public only as a hack so that the ReplicationCoordinator can find
+ // out the process's RID in master/slave setups.
+ ReplSource();
/* -1 = error */
int sync(int& nApplied);
diff --git a/src/mongo/db/repl/repl_coordinator.h b/src/mongo/db/repl/repl_coordinator.h
index 000476fd4e8..d24293c1973 100644
--- a/src/mongo/db/repl/repl_coordinator.h
+++ b/src/mongo/db/repl/repl_coordinator.h
@@ -244,6 +244,18 @@ namespace repl {
virtual OID getElectionId() = 0;
/**
+ * Returns the RID for this node. The RID is used to identify this node to our sync source
+ * when sending updates about our replication progress.
+ */
+ virtual OID getMyRID() = 0;
+
+ /**
+ * Prepares a BSONObj describing an invocation of the replSetUpdatePosition command that can
+ * be sent to this node's sync source to update it about our progress in replication.
+ */
+ virtual void prepareReplSetUpdatePositionCommand(BSONObjBuilder* cmdBuilder) = 0;
+
+ /**
* Handles an incoming replSetGetStatus command. Adds BSON to 'result'.
*/
virtual void processReplSetGetStatus(BSONObjBuilder* result) = 0;
diff --git a/src/mongo/db/repl/repl_coordinator_hybrid.cpp b/src/mongo/db/repl/repl_coordinator_hybrid.cpp
index 7ca34597f20..aff15a938ce 100644
--- a/src/mongo/db/repl/repl_coordinator_hybrid.cpp
+++ b/src/mongo/db/repl/repl_coordinator_hybrid.cpp
@@ -146,6 +146,18 @@ namespace repl {
return legacyOID;
}
+ OID HybridReplicationCoordinator::getMyRID() {
+ OID legacyRID = _legacy.getMyRID();
+ _impl.getMyRID();
+ return legacyRID;
+ }
+
+ void HybridReplicationCoordinator::prepareReplSetUpdatePositionCommand(BSONObjBuilder* result) {
+ _legacy.prepareReplSetUpdatePositionCommand(result);
+ BSONObjBuilder implResult;
+ _impl.prepareReplSetUpdatePositionCommand(&implResult);
+ }
+
void HybridReplicationCoordinator::processReplSetGetStatus(BSONObjBuilder* result) {
_legacy.processReplSetGetStatus(result);
BSONObjBuilder implResult;
diff --git a/src/mongo/db/repl/repl_coordinator_hybrid.h b/src/mongo/db/repl/repl_coordinator_hybrid.h
index 77ea87a4488..f359e5eb13f 100644
--- a/src/mongo/db/repl/repl_coordinator_hybrid.h
+++ b/src/mongo/db/repl/repl_coordinator_hybrid.h
@@ -94,6 +94,10 @@ namespace repl {
virtual OID getElectionId();
+ virtual OID getMyRID();
+
+ virtual void prepareReplSetUpdatePositionCommand(BSONObjBuilder* cmdBuilder);
+
virtual void processReplSetGetStatus(BSONObjBuilder* result);
virtual bool setMaintenanceMode(bool activate);
diff --git a/src/mongo/db/repl/repl_coordinator_impl.cpp b/src/mongo/db/repl/repl_coordinator_impl.cpp
index 35572c5c742..5e5a4e03e28 100644
--- a/src/mongo/db/repl/repl_coordinator_impl.cpp
+++ b/src/mongo/db/repl/repl_coordinator_impl.cpp
@@ -329,6 +329,17 @@ namespace repl {
return OID();
}
+
+ OID ReplicationCoordinatorImpl::getMyRID() {
+ // TODO
+ return OID();
+ }
+
+ void ReplicationCoordinatorImpl::prepareReplSetUpdatePositionCommand(
+ BSONObjBuilder* cmdBuilder) {
+ // TODO
+ }
+
void ReplicationCoordinatorImpl::processReplSetGetStatus(BSONObjBuilder* result) {
// TODO
}
diff --git a/src/mongo/db/repl/repl_coordinator_impl.h b/src/mongo/db/repl/repl_coordinator_impl.h
index fe778185ca7..e0585477f82 100644
--- a/src/mongo/db/repl/repl_coordinator_impl.h
+++ b/src/mongo/db/repl/repl_coordinator_impl.h
@@ -100,6 +100,10 @@ namespace repl {
virtual OID getElectionId();
+ virtual OID getMyRID();
+
+ virtual void prepareReplSetUpdatePositionCommand(BSONObjBuilder* cmdBuilder);
+
virtual void processReplSetGetStatus(BSONObjBuilder* result);
virtual bool setMaintenanceMode(bool activate);
diff --git a/src/mongo/db/repl/repl_coordinator_legacy.cpp b/src/mongo/db/repl/repl_coordinator_legacy.cpp
index bb82ef7458a..89f2eb667e3 100644
--- a/src/mongo/db/repl/repl_coordinator_legacy.cpp
+++ b/src/mongo/db/repl/repl_coordinator_legacy.cpp
@@ -51,6 +51,7 @@
#include "mongo/util/assert_util.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
+#include "mongo/util/map_util.h"
namespace mongo {
@@ -371,26 +372,43 @@ namespace {
Status LegacyReplicationCoordinator::setLastOptime(const OID& rid,
const OpTime& ts) {
- BSONObj config;
{
- boost::lock_guard<boost::mutex> lock(_ridConfigMapMutex);
- config = _ridConfigMap[rid];
- }
- LOG(2) << "received notification that node with RID " << rid << " and config " << config <<
- " has reached optime: " << ts.toStringPretty();
- invariant(!config.isEmpty());
- std::string oplogNs = getReplicationMode() == modeReplSet?
- "local.oplog.rs" : "local.oplog.$main";
- if (!updateSlaveTracking(BSON("_id" << rid), config, oplogNs, ts)) {
- return Status(ErrorCodes::NodeNotFound,
- str::stream() << "could not update node with _id: "
- << config["_id"].Int()
- << " beacuse it cannot be found in current ReplSetConfig");
+ boost::lock_guard<boost::mutex> lock(_mutex);
+ if (ts <= mapFindWithDefault(_slaveOpTimeMap, rid, OpTime())) {
+ // Only update if ts is newer than what we have already
+ return Status::OK();
+ }
+ BSONObj config = mapFindWithDefault(_ridConfigMap, rid, BSONObj());
+ LOG(2) << "received notification that node with RID " << rid << " and config " << config
+ << " has reached optime: " << ts.toStringPretty();
+
+ if (rid != getMyRID()) {
+ // TODO(spencer): Remove this invariant for backwards compatibility
+ invariant(!config.isEmpty());
+ std::string oplogNs = getReplicationMode() == modeReplSet?
+ "local.oplog.rs" : "local.oplog.$main";
+ // This is what updates the progress information used for satisfying write concern
+ // and wakes up threads waiting for replication. It also updates the tracking for
+ // maintaining local.slaves
+ if (!updateSlaveTracking(BSON("_id" << rid), config, oplogNs, ts)) {
+ return Status(ErrorCodes::NodeNotFound,
+ str::stream() << "could not update node with _id: "
+ << config["_id"].Int()
+ << " because it cannot be found in current ReplSetConfig");
+ }
+ }
+
+ // This updates the _slaveOpTimeMap which is used for forwarding slave progress
+ // upstream in chained replication.
+ LOG(2) << "Updating our knowledge of the replication progress for node with RID " <<
+ rid << " to be at optime " << ts;
+ _slaveOpTimeMap[rid] = ts;
+
}
if (getReplicationMode() == modeReplSet && !getCurrentMemberState().primary()) {
// pass along if we are not primary
- theReplSet->syncSourceFeedback.updateMap(rid, ts);
+ theReplSet->syncSourceFeedback.forwardSlaveProgress();
}
return Status::OK();
}
@@ -399,6 +417,47 @@ namespace {
return theReplSet->getElectionId();
}
+
+ OID LegacyReplicationCoordinator::getMyRID() {
+ Mode mode = getReplicationMode();
+ if (mode == modeReplSet) {
+ return theReplSet->syncSourceFeedback.getMyRID();
+ } else if (mode == modeMasterSlave) {
+ ReplSource source;
+ return source.getMyRID();
+ }
+ invariant(false); // Don't have an RID if no replication is enabled
+ }
+
+ void LegacyReplicationCoordinator::prepareReplSetUpdatePositionCommand(
+ BSONObjBuilder* cmdBuilder) {
+ invariant(getReplicationMode() == modeReplSet);
+ boost::lock_guard<boost::mutex> lock(_mutex);
+ cmdBuilder->append("replSetUpdatePosition", 1);
+ // create an array containing objects each member connected to us and for ourself
+ BSONArrayBuilder arrayBuilder(cmdBuilder->subarrayStart("optimes"));
+ OID myID = getMyRID();
+ {
+ for (SlaveOpTimeMap::const_iterator itr = _slaveOpTimeMap.begin();
+ itr != _slaveOpTimeMap.end(); ++itr) {
+ const OID& rid = itr->first;
+ const BSONObj& config = mapFindWithDefault(_ridConfigMap, rid, BSONObj());
+ BSONObjBuilder entry(arrayBuilder.subobjStart());
+ entry.append("_id", rid);
+ entry.append("optime", itr->second);
+ // SERVER-14550 Even though the "config" field isn't used on the other end in 2.8,
+ // we need to keep sending it for 2.6 compatibility.
+ // TODO(spencer): Remove this after 2.8 is released.
+ if (rid == myID) {
+ entry.append("config", theReplSet->myConfig().asBson());
+ }
+ else {
+ entry.append("config", config);
+ }
+ }
+ }
+ }
+
void LegacyReplicationCoordinator::processReplSetGetStatus(BSONObjBuilder* result) {
theReplSet->summarizeStatus(*result);
}
@@ -914,7 +973,7 @@ namespace {
LOG(2) << "Received handshake " << handshake << " from node with RID " << remoteID;
{
- boost::lock_guard<boost::mutex> lock(_ridConfigMapMutex);
+ boost::lock_guard<boost::mutex> lock(_mutex);
BSONObj configObj = handshake["config"].Obj().getOwned();
invariant(!configObj.isEmpty());
_ridConfigMap[remoteID] = configObj;
diff --git a/src/mongo/db/repl/repl_coordinator_legacy.h b/src/mongo/db/repl/repl_coordinator_legacy.h
index f9cf15be96c..64f69f1bee1 100644
--- a/src/mongo/db/repl/repl_coordinator_legacy.h
+++ b/src/mongo/db/repl/repl_coordinator_legacy.h
@@ -90,6 +90,10 @@ namespace repl {
virtual OID getElectionId();
+ virtual OID getMyRID();
+
+ virtual void prepareReplSetUpdatePositionCommand(BSONObjBuilder* cmdBuilder);
+
virtual void processReplSetGetStatus(BSONObjBuilder* result);
virtual bool setMaintenanceMode(bool activate);
@@ -143,12 +147,17 @@ namespace repl {
Status _checkReplEnabledForCommand(BSONObjBuilder* result);
- // Mutex that protects the _ridConfigMap
- boost::mutex _ridConfigMapMutex;
+ // Mutex that protects the _ridConfigMap and the _slaveOpTimeMap;
+ boost::mutex _mutex;
// Map from RID to member config object
std::map<OID, BSONObj> _ridConfigMap;
+ // Maps nodes in this replication group to the last oplog operation they have committed
+ // TODO(spencer): change to unordered_map
+ typedef std::map<OID, OpTime> SlaveOpTimeMap;
+ SlaveOpTimeMap _slaveOpTimeMap;
+
// Rollback id. used to check if a rollback happened during some interval of time
// TODO: ideally this should only change on rollbacks NOT on mongod restarts also.
int _rbid;
diff --git a/src/mongo/db/repl/repl_coordinator_mock.cpp b/src/mongo/db/repl/repl_coordinator_mock.cpp
index aab04be4938..ef16aff8f61 100644
--- a/src/mongo/db/repl/repl_coordinator_mock.cpp
+++ b/src/mongo/db/repl/repl_coordinator_mock.cpp
@@ -131,6 +131,13 @@ namespace repl {
return OID();
}
+ OID ReplicationCoordinatorMock::getMyRID() {
+ return OID();
+ }
+
+ void ReplicationCoordinatorMock::prepareReplSetUpdatePositionCommand(
+ BSONObjBuilder* cmdBuilder) {}
+
void ReplicationCoordinatorMock::processReplSetGetStatus(BSONObjBuilder* result) {
//TODO
}
diff --git a/src/mongo/db/repl/repl_coordinator_mock.h b/src/mongo/db/repl/repl_coordinator_mock.h
index 1ac42d4a248..a3b970b338a 100644
--- a/src/mongo/db/repl/repl_coordinator_mock.h
+++ b/src/mongo/db/repl/repl_coordinator_mock.h
@@ -93,6 +93,10 @@ namespace repl {
virtual OID getElectionId();
+ virtual OID getMyRID();
+
+ virtual void prepareReplSetUpdatePositionCommand(BSONObjBuilder* cmdBuilder);
+
virtual void processReplSetGetStatus(BSONObjBuilder* result);
virtual bool setMaintenanceMode(bool activate);
diff --git a/src/mongo/db/repl/sync_source_feedback.cpp b/src/mongo/db/repl/sync_source_feedback.cpp
index cfb6c312e0a..20df8d80397 100644
--- a/src/mongo/db/repl/sync_source_feedback.cpp
+++ b/src/mongo/db/repl/sync_source_feedback.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/auth/security_key.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/repl/bgsync.h"
+#include "mongo/db/repl/repl_coordinator_global.h"
#include "mongo/db/repl/rs.h" // theReplSet
#include "mongo/db/operation_context_impl.h"
#include "mongo/util/log.h"
@@ -179,49 +180,27 @@ namespace repl {
_cond.notify_all();
}
- void SyncSourceFeedback::updateMap(const mongo::OID& rid, const OpTime& ot) {
+ void SyncSourceFeedback::forwardSlaveProgress() {
boost::unique_lock<boost::mutex> lock(_mtx);
- // only update if ot is newer than what we have already
- if (ot > _slaveMap[rid]) {
- _slaveMap[rid] = ot;
- _positionChanged = true;
- LOG(2) << "Updating our knowledge of the replication progress for node with RID " <<
- rid << " to be at optime " << ot;
- _cond.notify_all();
- }
+ _positionChanged = true;
+ _cond.notify_all();
}
bool SyncSourceFeedback::updateUpstream() {
- if (theReplSet->isPrimary()) {
+ ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
+ if (replCoord->getCurrentMemberState().primary()) {
// primary has no one to update to
return true;
}
BSONObjBuilder cmd;
- cmd.append("replSetUpdatePosition", 1);
- // create an array containing objects each member connected to us and for ourself
- BSONArrayBuilder array (cmd.subarrayStart("optimes"));
- OID myID = _me["_id"].OID();
{
boost::unique_lock<boost::mutex> lock(_mtx);
if (_handshakeNeeded) {
// Don't send updates if there are nodes that haven't yet been handshaked
return false;
}
- for (map<mongo::OID, OpTime>::const_iterator itr = _slaveMap.begin();
- itr != _slaveMap.end(); ++itr) {
- BSONObjBuilder entry(array.subobjStart());
- entry.append("_id", itr->first);
- entry.append("optime", itr->second);
- if (itr->first == myID) {
- entry.append("config", theReplSet->myConfig().asBson());
- }
- else {
- entry.append("config", _members[itr->first]->config().asBson());
- }
- entry.doneFast();
- }
+ replCoord->prepareReplSetUpdatePositionCommand(&cmd);
}
- array.done();
BSONObj res;
LOG(2) << "Sending slave oplog progress to upstream updater: " << cmd.done();
diff --git a/src/mongo/db/repl/sync_source_feedback.h b/src/mongo/db/repl/sync_source_feedback.h
index 2f368a03870..3225620afa9 100644
--- a/src/mongo/db/repl/sync_source_feedback.h
+++ b/src/mongo/db/repl/sync_source_feedback.h
@@ -55,18 +55,19 @@ namespace repl {
/// Ensures local.me is populated and populates it if not.
void ensureMe();
- /// Passes handshake up the replication chain, upon receiving a handshake.
+ /// Notifies the SyncSourceFeedbackThread to wake up and send a handshake up the replication
+ /// chain, upon receiving a handshake.
void forwardSlaveHandshake();
- void updateSelfInMap(const OpTime& ot) {
- updateMap(_me["_id"].OID(), ot);
- }
-
- /// Updates the _slaveMap to be forwarded to the sync target.
- void updateMap(const mongo::OID& rid, const OpTime& ot);
+ /// Notifies the SyncSourceFeedbackThread to wake up and send an update upstream of slave
+ /// replication progress.
+ void forwardSlaveProgress();
std::string name() const { return "SyncSourceFeedbackThread"; }
+ /// Returns the RID for this process. ensureMe() must have been called before this can be.
+ OID getMyRID() const { return _me["_id"].OID(); }
+
/// Loops forever, passing updates when they are present.
void run();
@@ -110,8 +111,6 @@ namespace repl {
boost::scoped_ptr<DBClientConnection> _connection;
// protects cond and maps and the indicator bools
boost::mutex _mtx;
- // contains the most recent optime of each member syncing to us
- std::map<mongo::OID, OpTime> _slaveMap;
typedef std::map<mongo::OID, Member*> OIDMemberMap;
// contains a pointer to each member, which we can look up by oid
OIDMemberMap _members;
diff --git a/src/mongo/db/repl/write_concern.cpp b/src/mongo/db/repl/write_concern.cpp
index 9816de13764..079a16fd281 100644
--- a/src/mongo/db/repl/write_concern.cpp
+++ b/src/mongo/db/repl/write_concern.cpp
@@ -140,6 +140,7 @@ namespace repl {
_dirty = true;
// update write concern tags if this node is primary
+ // TODO(spencer): Move this logic up into the ReplicationCoordinator
if (theReplSet && theReplSet->isPrimary()) {
const Member* mem = theReplSet->findById(ident.obj["config"]["_id"].Int());
if (!mem) {