diff options
author | Spencer T Brody <spencer@mongodb.com> | 2014-07-16 14:30:35 -0400 |
---|---|---|
committer | Spencer T Brody <spencer@mongodb.com> | 2014-07-18 14:47:11 -0400 |
commit | 7c9e21b95804fde5a4837c5aab47ab9c2e57a267 (patch) | |
tree | a5e712c0d5bc29f6f32f2f80e3f1833f115190b8 /src | |
parent | f6cc85d06aaa39cea2baaf8771932bb8c5672cf7 (diff) | |
download | mongo-7c9e21b95804fde5a4837c5aab47ab9c2e57a267.tar.gz |
SERVER-14442 Make ReplicationCoordinator responsible for constructing ReplSetUpdatePosition commands
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/master_slave.cpp | 35 | ||||
-rw-r--r-- | src/mongo/db/repl/master_slave.h | 9 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator.h | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_hybrid.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_hybrid.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_impl.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_impl.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_legacy.cpp | 91 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_legacy.h | 13 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_mock.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_mock.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_source_feedback.cpp | 35 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_source_feedback.h | 17 | ||||
-rw-r--r-- | src/mongo/db/repl/write_concern.cpp | 1 |
15 files changed, 186 insertions, 73 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index c5fd835c650..beb4904428b 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -33,6 +33,7 @@ #include "mongo/db/commands/server_status_metric.h" #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/oplog.h" +#include "mongo/db/repl/repl_coordinator_global.h" #include "mongo/db/repl/rs_sync.h" #include "mongo/db/repl/rs.h" #include "mongo/util/fail_point_service.h" @@ -111,7 +112,8 @@ namespace repl { } void BackgroundSync::notify() { - theReplSet->syncSourceFeedback.updateSelfInMap(theReplSet->lastOpTimeWritten); + ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); + replCoord->setLastOptime(replCoord->getMyRID(), theReplSet->lastOpTimeWritten); { boost::unique_lock<boost::mutex> lock(s_instance->_mutex); diff --git a/src/mongo/db/repl/master_slave.cpp b/src/mongo/db/repl/master_slave.cpp index 02ee6c11e41..ca6e2cc2e1d 100644 --- a/src/mongo/db/repl/master_slave.cpp +++ b/src/mongo/db/repl/master_slave.cpp @@ -168,24 +168,25 @@ namespace repl { void ReplSource::ensureMe() { string myname = getHostName(); + bool exists = false; { OperationContextImpl txn; - Client::WriteContext ctx(&txn, "local"); + Client::ReadContext ctx(&txn, "local"); // local.me is an identifier for a server for getLastError w:2+ - if (!Helpers::getSingleton(&txn, "local.me", _me) || - !_me.hasField("host") || - _me["host"].String() != myname) { - - // clean out local.me - Helpers::emptyCollection(&txn, "local.me"); - - // repopulate - BSONObjBuilder b; - b.appendOID("_id", 0, true); - b.append("host", myname); - _me = b.obj(); - Helpers::putSingleton(&txn, "local.me", _me); - } + exists = Helpers::getSingleton(&txn, "local.me", _me); + } + if (!exists || !_me.hasField("host") || _me["host"].String() != myname) { + OperationContextImpl txn; + Client::WriteContext ctx(&txn, "local"); + // clean out local.me + Helpers::emptyCollection(&txn, "local.me"); + + // repopulate + BSONObjBuilder b; + b.appendOID("_id", 0, true); + b.append("host", myname); + _me = b.obj(); + Helpers::putSingleton(&txn, "local.me", _me); ctx.commit(); } } @@ -1256,6 +1257,10 @@ namespace repl { replLocalAuth(); } + { + ReplSource temp; // Ensures local.me is populated + } + if ( replSettings.slave ) { verify( replSettings.slave == SimpleSlave ); LOG(1) << "slave=true" << endl; diff --git a/src/mongo/db/repl/master_slave.h b/src/mongo/db/repl/master_slave.h index 65e9384732c..15fd74b2176 100644 --- a/src/mongo/db/repl/master_slave.h +++ b/src/mongo/db/repl/master_slave.h @@ -101,8 +101,6 @@ namespace repl { BSONObj _me; - ReplSource(); - void resyncDrop( OperationContext* txn, const std::string& db ); // call without the db mutex void syncToTailOfRemoteLog(); @@ -128,6 +126,9 @@ namespace repl { public: OplogReader oplogReader; + // Returns the RID for this process. ensureMe() must have been called before this can be. + OID getMyRID() const { return _me["_id"].OID(); } + void applyOperation(OperationContext* txn, Database* db, const BSONObj& op); std::string hostName; // ip addr or hostname plus optionally, ":<port>" std::string _sourceName; // a logical source name. @@ -142,6 +143,10 @@ namespace repl { typedef std::vector< shared_ptr< ReplSource > > SourceVector; static void loadAll(OperationContext* txn, SourceVector&); explicit ReplSource(BSONObj); + // This is not the constructor you are looking for. Always prefer the version that takes + // a BSONObj. This is public only as a hack so that the ReplicationCoordinator can find + // out the process's RID in master/slave setups. + ReplSource(); /* -1 = error */ int sync(int& nApplied); diff --git a/src/mongo/db/repl/repl_coordinator.h b/src/mongo/db/repl/repl_coordinator.h index 000476fd4e8..d24293c1973 100644 --- a/src/mongo/db/repl/repl_coordinator.h +++ b/src/mongo/db/repl/repl_coordinator.h @@ -244,6 +244,18 @@ namespace repl { virtual OID getElectionId() = 0; /** + * Returns the RID for this node. The RID is used to identify this node to our sync source + * when sending updates about our replication progress. + */ + virtual OID getMyRID() = 0; + + /** + * Prepares a BSONObj describing an invocation of the replSetUpdatePosition command that can + * be sent to this node's sync source to update it about our progress in replication. + */ + virtual void prepareReplSetUpdatePositionCommand(BSONObjBuilder* cmdBuilder) = 0; + + /** * Handles an incoming replSetGetStatus command. Adds BSON to 'result'. */ virtual void processReplSetGetStatus(BSONObjBuilder* result) = 0; diff --git a/src/mongo/db/repl/repl_coordinator_hybrid.cpp b/src/mongo/db/repl/repl_coordinator_hybrid.cpp index 7ca34597f20..aff15a938ce 100644 --- a/src/mongo/db/repl/repl_coordinator_hybrid.cpp +++ b/src/mongo/db/repl/repl_coordinator_hybrid.cpp @@ -146,6 +146,18 @@ namespace repl { return legacyOID; } + OID HybridReplicationCoordinator::getMyRID() { + OID legacyRID = _legacy.getMyRID(); + _impl.getMyRID(); + return legacyRID; + } + + void HybridReplicationCoordinator::prepareReplSetUpdatePositionCommand(BSONObjBuilder* result) { + _legacy.prepareReplSetUpdatePositionCommand(result); + BSONObjBuilder implResult; + _impl.prepareReplSetUpdatePositionCommand(&implResult); + } + void HybridReplicationCoordinator::processReplSetGetStatus(BSONObjBuilder* result) { _legacy.processReplSetGetStatus(result); BSONObjBuilder implResult; diff --git a/src/mongo/db/repl/repl_coordinator_hybrid.h b/src/mongo/db/repl/repl_coordinator_hybrid.h index 77ea87a4488..f359e5eb13f 100644 --- a/src/mongo/db/repl/repl_coordinator_hybrid.h +++ b/src/mongo/db/repl/repl_coordinator_hybrid.h @@ -94,6 +94,10 @@ namespace repl { virtual OID getElectionId(); + virtual OID getMyRID(); + + virtual void prepareReplSetUpdatePositionCommand(BSONObjBuilder* cmdBuilder); + virtual void processReplSetGetStatus(BSONObjBuilder* result); virtual bool setMaintenanceMode(bool activate); diff --git a/src/mongo/db/repl/repl_coordinator_impl.cpp b/src/mongo/db/repl/repl_coordinator_impl.cpp index 35572c5c742..5e5a4e03e28 100644 --- a/src/mongo/db/repl/repl_coordinator_impl.cpp +++ b/src/mongo/db/repl/repl_coordinator_impl.cpp @@ -329,6 +329,17 @@ namespace repl { return OID(); } + + OID ReplicationCoordinatorImpl::getMyRID() { + // TODO + return OID(); + } + + void ReplicationCoordinatorImpl::prepareReplSetUpdatePositionCommand( + BSONObjBuilder* cmdBuilder) { + // TODO + } + void ReplicationCoordinatorImpl::processReplSetGetStatus(BSONObjBuilder* result) { // TODO } diff --git a/src/mongo/db/repl/repl_coordinator_impl.h b/src/mongo/db/repl/repl_coordinator_impl.h index fe778185ca7..e0585477f82 100644 --- a/src/mongo/db/repl/repl_coordinator_impl.h +++ b/src/mongo/db/repl/repl_coordinator_impl.h @@ -100,6 +100,10 @@ namespace repl { virtual OID getElectionId(); + virtual OID getMyRID(); + + virtual void prepareReplSetUpdatePositionCommand(BSONObjBuilder* cmdBuilder); + virtual void processReplSetGetStatus(BSONObjBuilder* result); virtual bool setMaintenanceMode(bool activate); diff --git a/src/mongo/db/repl/repl_coordinator_legacy.cpp b/src/mongo/db/repl/repl_coordinator_legacy.cpp index bb82ef7458a..89f2eb667e3 100644 --- a/src/mongo/db/repl/repl_coordinator_legacy.cpp +++ b/src/mongo/db/repl/repl_coordinator_legacy.cpp @@ -51,6 +51,7 @@ #include "mongo/util/assert_util.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" +#include "mongo/util/map_util.h" namespace mongo { @@ -371,26 +372,43 @@ namespace { Status LegacyReplicationCoordinator::setLastOptime(const OID& rid, const OpTime& ts) { - BSONObj config; { - boost::lock_guard<boost::mutex> lock(_ridConfigMapMutex); - config = _ridConfigMap[rid]; - } - LOG(2) << "received notification that node with RID " << rid << " and config " << config << - " has reached optime: " << ts.toStringPretty(); - invariant(!config.isEmpty()); - std::string oplogNs = getReplicationMode() == modeReplSet? - "local.oplog.rs" : "local.oplog.$main"; - if (!updateSlaveTracking(BSON("_id" << rid), config, oplogNs, ts)) { - return Status(ErrorCodes::NodeNotFound, - str::stream() << "could not update node with _id: " - << config["_id"].Int() - << " beacuse it cannot be found in current ReplSetConfig"); + boost::lock_guard<boost::mutex> lock(_mutex); + if (ts <= mapFindWithDefault(_slaveOpTimeMap, rid, OpTime())) { + // Only update if ts is newer than what we have already + return Status::OK(); + } + BSONObj config = mapFindWithDefault(_ridConfigMap, rid, BSONObj()); + LOG(2) << "received notification that node with RID " << rid << " and config " << config + << " has reached optime: " << ts.toStringPretty(); + + if (rid != getMyRID()) { + // TODO(spencer): Remove this invariant for backwards compatibility + invariant(!config.isEmpty()); + std::string oplogNs = getReplicationMode() == modeReplSet? + "local.oplog.rs" : "local.oplog.$main"; + // This is what updates the progress information used for satisfying write concern + // and wakes up threads waiting for replication. It also updates the tracking for + // maintaining local.slaves + if (!updateSlaveTracking(BSON("_id" << rid), config, oplogNs, ts)) { + return Status(ErrorCodes::NodeNotFound, + str::stream() << "could not update node with _id: " + << config["_id"].Int() + << " because it cannot be found in current ReplSetConfig"); + } + } + + // This updates the _slaveOpTimeMap which is used for forwarding slave progress + // upstream in chained replication. + LOG(2) << "Updating our knowledge of the replication progress for node with RID " << + rid << " to be at optime " << ts; + _slaveOpTimeMap[rid] = ts; + } if (getReplicationMode() == modeReplSet && !getCurrentMemberState().primary()) { // pass along if we are not primary - theReplSet->syncSourceFeedback.updateMap(rid, ts); + theReplSet->syncSourceFeedback.forwardSlaveProgress(); } return Status::OK(); } @@ -399,6 +417,47 @@ namespace { return theReplSet->getElectionId(); } + + OID LegacyReplicationCoordinator::getMyRID() { + Mode mode = getReplicationMode(); + if (mode == modeReplSet) { + return theReplSet->syncSourceFeedback.getMyRID(); + } else if (mode == modeMasterSlave) { + ReplSource source; + return source.getMyRID(); + } + invariant(false); // Don't have an RID if no replication is enabled + } + + void LegacyReplicationCoordinator::prepareReplSetUpdatePositionCommand( + BSONObjBuilder* cmdBuilder) { + invariant(getReplicationMode() == modeReplSet); + boost::lock_guard<boost::mutex> lock(_mutex); + cmdBuilder->append("replSetUpdatePosition", 1); + // create an array containing objects each member connected to us and for ourself + BSONArrayBuilder arrayBuilder(cmdBuilder->subarrayStart("optimes")); + OID myID = getMyRID(); + { + for (SlaveOpTimeMap::const_iterator itr = _slaveOpTimeMap.begin(); + itr != _slaveOpTimeMap.end(); ++itr) { + const OID& rid = itr->first; + const BSONObj& config = mapFindWithDefault(_ridConfigMap, rid, BSONObj()); + BSONObjBuilder entry(arrayBuilder.subobjStart()); + entry.append("_id", rid); + entry.append("optime", itr->second); + // SERVER-14550 Even though the "config" field isn't used on the other end in 2.8, + // we need to keep sending it for 2.6 compatibility. + // TODO(spencer): Remove this after 2.8 is released. + if (rid == myID) { + entry.append("config", theReplSet->myConfig().asBson()); + } + else { + entry.append("config", config); + } + } + } + } + void LegacyReplicationCoordinator::processReplSetGetStatus(BSONObjBuilder* result) { theReplSet->summarizeStatus(*result); } @@ -914,7 +973,7 @@ namespace { LOG(2) << "Received handshake " << handshake << " from node with RID " << remoteID; { - boost::lock_guard<boost::mutex> lock(_ridConfigMapMutex); + boost::lock_guard<boost::mutex> lock(_mutex); BSONObj configObj = handshake["config"].Obj().getOwned(); invariant(!configObj.isEmpty()); _ridConfigMap[remoteID] = configObj; diff --git a/src/mongo/db/repl/repl_coordinator_legacy.h b/src/mongo/db/repl/repl_coordinator_legacy.h index f9cf15be96c..64f69f1bee1 100644 --- a/src/mongo/db/repl/repl_coordinator_legacy.h +++ b/src/mongo/db/repl/repl_coordinator_legacy.h @@ -90,6 +90,10 @@ namespace repl { virtual OID getElectionId(); + virtual OID getMyRID(); + + virtual void prepareReplSetUpdatePositionCommand(BSONObjBuilder* cmdBuilder); + virtual void processReplSetGetStatus(BSONObjBuilder* result); virtual bool setMaintenanceMode(bool activate); @@ -143,12 +147,17 @@ namespace repl { Status _checkReplEnabledForCommand(BSONObjBuilder* result); - // Mutex that protects the _ridConfigMap - boost::mutex _ridConfigMapMutex; + // Mutex that protects the _ridConfigMap and the _slaveOpTimeMap; + boost::mutex _mutex; // Map from RID to member config object std::map<OID, BSONObj> _ridConfigMap; + // Maps nodes in this replication group to the last oplog operation they have committed + // TODO(spencer): change to unordered_map + typedef std::map<OID, OpTime> SlaveOpTimeMap; + SlaveOpTimeMap _slaveOpTimeMap; + // Rollback id. used to check if a rollback happened during some interval of time // TODO: ideally this should only change on rollbacks NOT on mongod restarts also. int _rbid; diff --git a/src/mongo/db/repl/repl_coordinator_mock.cpp b/src/mongo/db/repl/repl_coordinator_mock.cpp index aab04be4938..ef16aff8f61 100644 --- a/src/mongo/db/repl/repl_coordinator_mock.cpp +++ b/src/mongo/db/repl/repl_coordinator_mock.cpp @@ -131,6 +131,13 @@ namespace repl { return OID(); } + OID ReplicationCoordinatorMock::getMyRID() { + return OID(); + } + + void ReplicationCoordinatorMock::prepareReplSetUpdatePositionCommand( + BSONObjBuilder* cmdBuilder) {} + void ReplicationCoordinatorMock::processReplSetGetStatus(BSONObjBuilder* result) { //TODO } diff --git a/src/mongo/db/repl/repl_coordinator_mock.h b/src/mongo/db/repl/repl_coordinator_mock.h index 1ac42d4a248..a3b970b338a 100644 --- a/src/mongo/db/repl/repl_coordinator_mock.h +++ b/src/mongo/db/repl/repl_coordinator_mock.h @@ -93,6 +93,10 @@ namespace repl { virtual OID getElectionId(); + virtual OID getMyRID(); + + virtual void prepareReplSetUpdatePositionCommand(BSONObjBuilder* cmdBuilder); + virtual void processReplSetGetStatus(BSONObjBuilder* result); virtual bool setMaintenanceMode(bool activate); diff --git a/src/mongo/db/repl/sync_source_feedback.cpp b/src/mongo/db/repl/sync_source_feedback.cpp index cfb6c312e0a..20df8d80397 100644 --- a/src/mongo/db/repl/sync_source_feedback.cpp +++ b/src/mongo/db/repl/sync_source_feedback.cpp @@ -38,6 +38,7 @@ #include "mongo/db/auth/security_key.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/repl/bgsync.h" +#include "mongo/db/repl/repl_coordinator_global.h" #include "mongo/db/repl/rs.h" // theReplSet #include "mongo/db/operation_context_impl.h" #include "mongo/util/log.h" @@ -179,49 +180,27 @@ namespace repl { _cond.notify_all(); } - void SyncSourceFeedback::updateMap(const mongo::OID& rid, const OpTime& ot) { + void SyncSourceFeedback::forwardSlaveProgress() { boost::unique_lock<boost::mutex> lock(_mtx); - // only update if ot is newer than what we have already - if (ot > _slaveMap[rid]) { - _slaveMap[rid] = ot; - _positionChanged = true; - LOG(2) << "Updating our knowledge of the replication progress for node with RID " << - rid << " to be at optime " << ot; - _cond.notify_all(); - } + _positionChanged = true; + _cond.notify_all(); } bool SyncSourceFeedback::updateUpstream() { - if (theReplSet->isPrimary()) { + ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); + if (replCoord->getCurrentMemberState().primary()) { // primary has no one to update to return true; } BSONObjBuilder cmd; - cmd.append("replSetUpdatePosition", 1); - // create an array containing objects each member connected to us and for ourself - BSONArrayBuilder array (cmd.subarrayStart("optimes")); - OID myID = _me["_id"].OID(); { boost::unique_lock<boost::mutex> lock(_mtx); if (_handshakeNeeded) { // Don't send updates if there are nodes that haven't yet been handshaked return false; } - for (map<mongo::OID, OpTime>::const_iterator itr = _slaveMap.begin(); - itr != _slaveMap.end(); ++itr) { - BSONObjBuilder entry(array.subobjStart()); - entry.append("_id", itr->first); - entry.append("optime", itr->second); - if (itr->first == myID) { - entry.append("config", theReplSet->myConfig().asBson()); - } - else { - entry.append("config", _members[itr->first]->config().asBson()); - } - entry.doneFast(); - } + replCoord->prepareReplSetUpdatePositionCommand(&cmd); } - array.done(); BSONObj res; LOG(2) << "Sending slave oplog progress to upstream updater: " << cmd.done(); diff --git a/src/mongo/db/repl/sync_source_feedback.h b/src/mongo/db/repl/sync_source_feedback.h index 2f368a03870..3225620afa9 100644 --- a/src/mongo/db/repl/sync_source_feedback.h +++ b/src/mongo/db/repl/sync_source_feedback.h @@ -55,18 +55,19 @@ namespace repl { /// Ensures local.me is populated and populates it if not. void ensureMe(); - /// Passes handshake up the replication chain, upon receiving a handshake. + /// Notifies the SyncSourceFeedbackThread to wake up and send a handshake up the replication + /// chain, upon receiving a handshake. void forwardSlaveHandshake(); - void updateSelfInMap(const OpTime& ot) { - updateMap(_me["_id"].OID(), ot); - } - - /// Updates the _slaveMap to be forwarded to the sync target. - void updateMap(const mongo::OID& rid, const OpTime& ot); + /// Notifies the SyncSourceFeedbackThread to wake up and send an update upstream of slave + /// replication progress. + void forwardSlaveProgress(); std::string name() const { return "SyncSourceFeedbackThread"; } + /// Returns the RID for this process. ensureMe() must have been called before this can be. + OID getMyRID() const { return _me["_id"].OID(); } + /// Loops forever, passing updates when they are present. void run(); @@ -110,8 +111,6 @@ namespace repl { boost::scoped_ptr<DBClientConnection> _connection; // protects cond and maps and the indicator bools boost::mutex _mtx; - // contains the most recent optime of each member syncing to us - std::map<mongo::OID, OpTime> _slaveMap; typedef std::map<mongo::OID, Member*> OIDMemberMap; // contains a pointer to each member, which we can look up by oid OIDMemberMap _members; diff --git a/src/mongo/db/repl/write_concern.cpp b/src/mongo/db/repl/write_concern.cpp index 9816de13764..079a16fd281 100644 --- a/src/mongo/db/repl/write_concern.cpp +++ b/src/mongo/db/repl/write_concern.cpp @@ -140,6 +140,7 @@ namespace repl { _dirty = true; // update write concern tags if this node is primary + // TODO(spencer): Move this logic up into the ReplicationCoordinator if (theReplSet && theReplSet->isPrimary()) { const Member* mem = theReplSet->findById(ident.obj["config"]["_id"].Int()); if (!mem) { |