diff options
-rw-r--r-- | src/mongo/db/repl/repl_coordinator.h | 10 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_hybrid.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_hybrid.h | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_impl.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_impl.h | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_legacy.cpp | 60 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_legacy.h | 9 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_mock.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_mock.h | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_set_impl.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_set_impl.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_source_feedback.cpp | 38 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_source_feedback.h | 8 |
13 files changed, 96 insertions, 71 deletions
diff --git a/src/mongo/db/repl/repl_coordinator.h b/src/mongo/db/repl/repl_coordinator.h index a08ce851279..3de266b67b2 100644 --- a/src/mongo/db/repl/repl_coordinator.h +++ b/src/mongo/db/repl/repl_coordinator.h @@ -29,6 +29,7 @@ #pragma once #include <boost/date_time/posix_time/posix_time_types.hpp> +#include <vector> #include "mongo/base/disallow_copying.h" #include "mongo/base/status.h" @@ -258,6 +259,15 @@ namespace repl { virtual void prepareReplSetUpdatePositionCommand(BSONObjBuilder* cmdBuilder) = 0; /** + * For ourself and each secondary chaining off of us, adds a BSONObj to "handshakes" + * describing an invocation of the replSetUpdateCommand that can be sent to this node's + * sync source to handshake us and our chained secondaries, informing the sync source that + * we are replicating off of it. + */ + virtual void prepareReplSetUpdatePositionCommandHandshakes( + std::vector<BSONObj>* handshakes) = 0; + + /** * Handles an incoming replSetGetStatus command. Adds BSON to 'result'. */ virtual void processReplSetGetStatus(BSONObjBuilder* result) = 0; diff --git a/src/mongo/db/repl/repl_coordinator_hybrid.cpp b/src/mongo/db/repl/repl_coordinator_hybrid.cpp index 209fc03c036..51213625c24 100644 --- a/src/mongo/db/repl/repl_coordinator_hybrid.cpp +++ b/src/mongo/db/repl/repl_coordinator_hybrid.cpp @@ -163,6 +163,13 @@ namespace repl { _impl.prepareReplSetUpdatePositionCommand(&implResult); } + void HybridReplicationCoordinator::prepareReplSetUpdatePositionCommandHandshakes( + std::vector<BSONObj>* handshakes) { + _legacy.prepareReplSetUpdatePositionCommandHandshakes(handshakes); + std::vector<BSONObj> implResult; + _impl.prepareReplSetUpdatePositionCommandHandshakes(&implResult); + } + void HybridReplicationCoordinator::processReplSetGetStatus(BSONObjBuilder* result) { _legacy.processReplSetGetStatus(result); BSONObjBuilder implResult; diff --git a/src/mongo/db/repl/repl_coordinator_hybrid.h b/src/mongo/db/repl/repl_coordinator_hybrid.h index 46be836e2eb..0b6559c66f1 100644 --- a/src/mongo/db/repl/repl_coordinator_hybrid.h +++ b/src/mongo/db/repl/repl_coordinator_hybrid.h @@ -100,6 +100,9 @@ namespace repl { virtual void prepareReplSetUpdatePositionCommand(BSONObjBuilder* cmdBuilder); + virtual void prepareReplSetUpdatePositionCommandHandshakes( + std::vector<BSONObj>* handshakes); + virtual void processReplSetGetStatus(BSONObjBuilder* result); virtual bool setMaintenanceMode(OperationContext* txn, bool activate); diff --git a/src/mongo/db/repl/repl_coordinator_impl.cpp b/src/mongo/db/repl/repl_coordinator_impl.cpp index 3382e1585b2..dc53fa84fbf 100644 --- a/src/mongo/db/repl/repl_coordinator_impl.cpp +++ b/src/mongo/db/repl/repl_coordinator_impl.cpp @@ -356,6 +356,11 @@ namespace repl { // TODO } + void ReplicationCoordinatorImpl::prepareReplSetUpdatePositionCommandHandshakes( + std::vector<BSONObj>* handshakes) { + // TODO + } + void ReplicationCoordinatorImpl::processReplSetGetStatus(BSONObjBuilder* result) { // TODO } diff --git a/src/mongo/db/repl/repl_coordinator_impl.h b/src/mongo/db/repl/repl_coordinator_impl.h index 929d567f86c..96497016b44 100644 --- a/src/mongo/db/repl/repl_coordinator_impl.h +++ b/src/mongo/db/repl/repl_coordinator_impl.h @@ -110,6 +110,9 @@ namespace repl { virtual void prepareReplSetUpdatePositionCommand(BSONObjBuilder* cmdBuilder); + virtual void prepareReplSetUpdatePositionCommandHandshakes( + std::vector<BSONObj>* handshakes); + virtual void processReplSetGetStatus(BSONObjBuilder* result); virtual bool setMaintenanceMode(OperationContext* txn, bool activate); diff --git a/src/mongo/db/repl/repl_coordinator_legacy.cpp b/src/mongo/db/repl/repl_coordinator_legacy.cpp index 735accbeb23..2740a2cb68d 100644 --- a/src/mongo/db/repl/repl_coordinator_legacy.cpp +++ b/src/mongo/db/repl/repl_coordinator_legacy.cpp @@ -40,6 +40,7 @@ #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/connections.h" #include "mongo/db/repl/master_slave.h" +#include "mongo/db/repl/member.h" #include "mongo/db/repl/oplog.h" // for newRepl() #include "mongo/db/repl/repl_set_seed_list.h" #include "mongo/db/repl/repl_settings.h" @@ -458,6 +459,36 @@ namespace { } } + void LegacyReplicationCoordinator::prepareReplSetUpdatePositionCommandHandshakes( + std::vector<BSONObj>* handshakes) { + invariant(getReplicationMode() == modeReplSet); + boost::lock_guard<boost::mutex> lock(_mutex); + // handshake obj for us + BSONObjBuilder cmd; + cmd.append("replSetUpdatePosition", 1); + BSONObjBuilder sub (cmd.subobjStart("handshake")); + sub.append("handshake", getMyRID()); + sub.append("member", theReplSet->selfId()); + sub.append("config", theReplSet->myConfig().asBson()); + sub.doneFast(); + handshakes->push_back(cmd.obj()); + + // handshake objs for all chained members + for (OIDMemberMap::const_iterator itr = _ridMemberMap.begin(); + itr != _ridMemberMap.end(); ++itr) { + BSONObjBuilder cmd; + cmd.append("replSetUpdatePosition", 1); + // outer handshake indicates this is a handshake command + // inner is needed as part of the structure to be passed to gotHandshake + BSONObjBuilder subCmd (cmd.subobjStart("handshake")); + subCmd.append("handshake", itr->first); + subCmd.append("member", itr->second->id()); + subCmd.append("config", itr->second->config().asBson()); + subCmd.doneFast(); + handshakes->push_back(cmd.obj()); + } + } + void LegacyReplicationCoordinator::processReplSetGetStatus(BSONObjBuilder* result) { theReplSet->summarizeStatus(*result); } @@ -973,22 +1004,31 @@ namespace { const BSONObj& handshake) { LOG(2) << "Received handshake " << handshake << " from node with RID " << remoteID; - { - boost::lock_guard<boost::mutex> lock(_mutex); - BSONObj configObj; - if (handshake.hasField("config")) { - configObj = handshake["config"].Obj().getOwned(); - } else { - configObj = BSON("host" << cc().clientAddress(true) << "upgradeNeeded" << true); - } - _ridConfigMap[remoteID] = configObj; + boost::lock_guard<boost::mutex> lock(_mutex); + BSONObj configObj; + if (handshake.hasField("config")) { + configObj = handshake["config"].Obj().getOwned(); + } else { + configObj = BSON("host" << cc().clientAddress(true) << "upgradeNeeded" << true); } + _ridConfigMap[remoteID] = configObj; if (getReplicationMode() != modeReplSet || !handshake.hasField("member")) { return false; } - return theReplSet->registerSlave(remoteID, handshake["member"].Int()); + int memberID = handshake["member"].Int(); + Member* member = theReplSet->getMutableMember(memberID); + // it is possible that a node that was removed in a reconfig tried to handshake this node + // in that case, the Member will no longer be in theReplSet's _members List and member + // will be NULL + if (!member) { + return false; + } + + _ridMemberMap[remoteID] = member; + theReplSet->syncSourceFeedback.forwardSlaveHandshake(); + return true; } void LegacyReplicationCoordinator::waitUpToOneSecondForOptimeChange(const OpTime& ot) { diff --git a/src/mongo/db/repl/repl_coordinator_legacy.h b/src/mongo/db/repl/repl_coordinator_legacy.h index ec5b3025f42..00e42de5145 100644 --- a/src/mongo/db/repl/repl_coordinator_legacy.h +++ b/src/mongo/db/repl/repl_coordinator_legacy.h @@ -34,6 +34,8 @@ namespace mongo { namespace repl { + class Member; + /** * An implementation of ReplicationCoordinator that simply delegates to existing code. */ @@ -96,6 +98,9 @@ namespace repl { virtual void prepareReplSetUpdatePositionCommand(BSONObjBuilder* cmdBuilder); + virtual void prepareReplSetUpdatePositionCommandHandshakes( + std::vector<BSONObj>* handshakes); + virtual void processReplSetGetStatus(BSONObjBuilder* result); virtual bool setMaintenanceMode(OperationContext* txn, bool activate); @@ -160,6 +165,10 @@ namespace repl { // Map from RID to member config object std::map<OID, BSONObj> _ridConfigMap; + // Map from RID to Member pointer for replica set nodes + typedef std::map<OID, Member*> OIDMemberMap; + OIDMemberMap _ridMemberMap; + // Maps nodes in this replication group to the last oplog operation they have committed // TODO(spencer): change to unordered_map typedef std::map<OID, OpTime> SlaveOpTimeMap; diff --git a/src/mongo/db/repl/repl_coordinator_mock.cpp b/src/mongo/db/repl/repl_coordinator_mock.cpp index 18dae8fbc40..7796e57051f 100644 --- a/src/mongo/db/repl/repl_coordinator_mock.cpp +++ b/src/mongo/db/repl/repl_coordinator_mock.cpp @@ -140,6 +140,9 @@ namespace repl { void ReplicationCoordinatorMock::prepareReplSetUpdatePositionCommand( BSONObjBuilder* cmdBuilder) {} + void ReplicationCoordinatorMock::prepareReplSetUpdatePositionCommandHandshakes( + std::vector<BSONObj>* handshakes) {} + void ReplicationCoordinatorMock::processReplSetGetStatus(BSONObjBuilder* result) { //TODO } diff --git a/src/mongo/db/repl/repl_coordinator_mock.h b/src/mongo/db/repl/repl_coordinator_mock.h index 6553ebbf144..3a0fdaf8fa3 100644 --- a/src/mongo/db/repl/repl_coordinator_mock.h +++ b/src/mongo/db/repl/repl_coordinator_mock.h @@ -99,6 +99,9 @@ namespace repl { virtual void prepareReplSetUpdatePositionCommand(BSONObjBuilder* cmdBuilder); + virtual void prepareReplSetUpdatePositionCommandHandshakes( + std::vector<BSONObj>* handshakes); + virtual void processReplSetGetStatus(BSONObjBuilder* result); virtual bool setMaintenanceMode(OperationContext* txn, bool activate); diff --git a/src/mongo/db/repl/repl_set_impl.cpp b/src/mongo/db/repl/repl_set_impl.cpp index c537fef3c8e..73309f29b78 100644 --- a/src/mongo/db/repl/repl_set_impl.cpp +++ b/src/mongo/db/repl/repl_set_impl.cpp @@ -914,21 +914,5 @@ namespace { return OpTime(); } - bool ReplSetImpl::registerSlave(const OID& rid, const int memberId) { - Member* member = NULL; - { - lock lk(this); - member = getMutableMember(memberId); - } - - // it is possible that a node that was removed in a reconfig tried to handshake this node - // in that case, the Member will no longer be in the _members List and member will be NULL - if (!member) { - return false; - } - syncSourceFeedback.associateMember(rid, member); - return true; - } - } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/repl_set_impl.h b/src/mongo/db/repl/repl_set_impl.h index 6920089228e..dbcd781e6e3 100644 --- a/src/mongo/db/repl/repl_set_impl.h +++ b/src/mongo/db/repl/repl_set_impl.h @@ -256,8 +256,6 @@ namespace repl { */ bool setMaintenanceMode(OperationContext* txn, const bool inc); - // Records a new slave's id in the GhostSlave map, at handshake time. - bool registerSlave(const OID& rid, const int memberId); private: Member* head() const { return _members.head(); } public: diff --git a/src/mongo/db/repl/sync_source_feedback.cpp b/src/mongo/db/repl/sync_source_feedback.cpp index d4b3e3994fc..788bb798bfb 100644 --- a/src/mongo/db/repl/sync_source_feedback.cpp +++ b/src/mongo/db/repl/sync_source_feedback.cpp @@ -52,15 +52,6 @@ namespace repl { // used in replAuthenticate static const BSONObj userReplQuery = fromjson("{\"user\":\"repl\"}"); - void SyncSourceFeedback::associateMember(const OID& rid, Member* member) { - invariant(member); - LOG(2) << "Associating RID " << rid << " with member: " << member->fullName(); - boost::unique_lock<boost::mutex> lock(_mtx); - _handshakeNeeded = true; - _members[rid] = member; - _cond.notify_all(); - } - bool SyncSourceFeedback::replAuthenticate() { if (!getGlobalAuthorizationManager()->isAuthEnabled()) return true; @@ -99,33 +90,8 @@ namespace repl { bool SyncSourceFeedback::replHandshake() { // construct a vector of handshake obj for us as well as all chained members std::vector<BSONObj> handshakeObjs; - { - boost::unique_lock<boost::mutex> lock(_mtx); - // handshake obj for us - BSONObjBuilder cmd; - cmd.append("replSetUpdatePosition", 1); - BSONObjBuilder sub (cmd.subobjStart("handshake")); - sub.append("handshake", getGlobalReplicationCoordinator()->getMyRID()); - sub.append("member", theReplSet->selfId()); - sub.append("config", theReplSet->myConfig().asBson()); - sub.doneFast(); - handshakeObjs.push_back(cmd.obj()); - - // handshake objs for all chained members - for (OIDMemberMap::iterator itr = _members.begin(); - itr != _members.end(); ++itr) { - BSONObjBuilder cmd; - cmd.append("replSetUpdatePosition", 1); - // outer handshake indicates this is a handshake command - // inner is needed as part of the structure to be passed to gotHandshake - BSONObjBuilder subCmd (cmd.subobjStart("handshake")); - subCmd.append("handshake", itr->first); - subCmd.append("member", itr->second->id()); - subCmd.append("config", itr->second->config().asBson()); - subCmd.doneFast(); - handshakeObjs.push_back(cmd.obj()); - } - } + getGlobalReplicationCoordinator()->prepareReplSetUpdatePositionCommandHandshakes( + &handshakeObjs); LOG(1) << "handshaking upstream updater"; for (std::vector<BSONObj>::iterator it = handshakeObjs.begin(); diff --git a/src/mongo/db/repl/sync_source_feedback.h b/src/mongo/db/repl/sync_source_feedback.h index c8cad777667..64c4d931b6b 100644 --- a/src/mongo/db/repl/sync_source_feedback.h +++ b/src/mongo/db/repl/sync_source_feedback.h @@ -52,9 +52,6 @@ namespace repl { ~SyncSourceFeedback() {} - /// Adds an entry to _members for a secondary that has connected to us. - void associateMember(const OID& rid, Member* member); - /// Ensures local.me is populated and populates it if not. /// TODO(spencer): Remove this function once the LegacyReplicationCoordinator is gone. void ensureMe(OperationContext* txn); @@ -115,11 +112,8 @@ namespace repl { const Member* _syncTarget; // our connection to our sync target boost::scoped_ptr<DBClientConnection> _connection; - // protects cond and maps and the indicator bools + // protects cond and the indicator bools boost::mutex _mtx; - typedef std::map<mongo::OID, Member*> OIDMemberMap; - // contains a pointer to each member, which we can look up by oid - OIDMemberMap _members; // used to alert our thread of changes which need to be passed up the chain boost::condition _cond; // used to indicate a position change which has not yet been pushed along |