diff options
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_impl.cpp | 59 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_impl_test.cpp | 60 |
2 files changed, 68 insertions, 51 deletions
diff --git a/src/mongo/db/repl/repl_coordinator_impl.cpp b/src/mongo/db/repl/repl_coordinator_impl.cpp index e16deb9f26d..374e398ec1f 100644 --- a/src/mongo/db/repl/repl_coordinator_impl.cpp +++ b/src/mongo/db/repl/repl_coordinator_impl.cpp @@ -332,14 +332,13 @@ namespace { { boost::lock_guard<boost::mutex> lk(_mutex); - // TODO(spencer): Assert that we've received a handshake for this node once the unit - // test is doing that correctly - OpTime& slaveOpTime = _slaveInfoMap[rid].opTime; - LOG(3) << "Node with RID " << rid << " currently has optime " << slaveOpTime << + SlaveInfo& slaveInfo = _slaveInfoMap[rid]; + invariant(slaveInfo.memberID >= 0 || _getReplicationMode_inlock() == modeMasterSlave); + + LOG(3) << "Node with RID " << rid << " currently has optime " << slaveInfo.opTime << "; updating to " << ts; - if (slaveOpTime < ts) { - slaveOpTime = ts; - // TODO(spencer): update write concern tags if we're a replSet + if (slaveInfo.opTime < ts) { + slaveInfo.opTime = ts; // Wake up any threads waiting for replication that now have their replication // check satisfied @@ -439,13 +438,7 @@ namespace { if (slaveTime >= opTime) { // This node has reached the desired optime, now we need to check if it is a part // of the tagPattern. - const MemberConfig* memberConfig = NULL; - if (it->first == getMyRID(NULL)) { - memberConfig = &_rsConfig.getMemberAt(_thisMembersConfigIndex); - } - else { - memberConfig = _rsConfig.findMemberByID(it->second.memberID); - } + const MemberConfig* memberConfig = _rsConfig.findMemberByID(it->second.memberID); invariant(memberConfig); for (MemberConfig::TagIterator it = memberConfig->tagsBegin(); it != memberConfig->tagsEnd(); ++it) { @@ -693,16 +686,9 @@ namespace { // SERVER-14550 Even though the "config" field isn't used on the other end in 2.8, // we need to keep sending it for 2.6 compatibility. // TODO(spencer): Remove this after 2.8 is released. - if (rid == getMyRID(txn)) { - entry.append("config", - _rsConfig.getMemberAt(_thisMembersConfigIndex).toBSON( - _rsConfig.getTagConfig())); - } - else { - const MemberConfig* member = _rsConfig.findMemberByID(info.memberID); - fassert(18651, member); // We ensured the member existed in processHandshake. - entry.append("config", member->toBSON(_rsConfig.getTagConfig())); - } + const MemberConfig* member = _rsConfig.findMemberByID(info.memberID); + fassert(18651, member); // We ensured the member existed in processHandshake. + entry.append("config", member->toBSON(_rsConfig.getTagConfig())); } } } @@ -711,28 +697,10 @@ namespace { OperationContext* txn, std::vector<BSONObj>* handshakes) { boost::lock_guard<boost::mutex> lock(_mutex); - // handshake obj for us - BSONObjBuilder cmd; - cmd.append("replSetUpdatePosition", 1); - { - BSONObjBuilder sub (cmd.subobjStart("handshake")); - sub.append("handshake", getMyRID(txn)); - sub.append("member", _thisMembersConfigIndex); - // SERVER-14550 Even though the "config" field isn't used on the other end in 2.8, - // we need to keep sending it for 2.6 compatibility. - // TODO(spencer): Remove this after 2.8 is released. - sub.append("config", _rsConfig.getMemberAt(_thisMembersConfigIndex).toBSON( - _rsConfig.getTagConfig())); - } - handshakes->push_back(cmd.obj()); - - // handshake objs for all chained members + // handshake objs for ourself and all chained members for (SlaveInfoMap::const_iterator itr = _slaveInfoMap.begin(); itr != _slaveInfoMap.end(); ++itr) { const OID& oid = itr->first; - if (oid == getMyRID(txn)) { // Already generated handshake for ourself - continue; - } BSONObjBuilder cmd; cmd.append("replSetUpdatePosition", 1); { @@ -1027,6 +995,11 @@ namespace { myIndex, _replExecutor.now(), _getLastOpApplied_inlock()); + + // Ensure that there's an entry in the _slaveInfoMap for ourself + _slaveInfoMap[getMyRID(NULL)].memberID = _rsConfig.getMemberAt(myIndex).getId(); + _slaveInfoMap[getMyRID(NULL)].hostAndPort = + _rsConfig.getMemberAt(myIndex).getHostAndPort(); _startHeartbeats(); } diff --git a/src/mongo/db/repl/repl_coordinator_impl_test.cpp b/src/mongo/db/repl/repl_coordinator_impl_test.cpp index 672c4acc7fb..f4ee27fd51d 100644 --- a/src/mongo/db/repl/repl_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/repl_coordinator_impl_test.cpp @@ -489,7 +489,10 @@ namespace { assertStartSuccess( BSON("_id" << "mySet" << "version" << 2 << - "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0 ))), + "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0) << + BSON("host" << "node2:12345" << "_id" << 1) << + BSON("host" << "node3:12345" << "_id" << 2) << + BSON("host" << "node4:12345" << "_id" << 3))), HostAndPort("node1", 12345)); OperationContextNoop txn; @@ -499,6 +502,16 @@ namespace { OpTime time1(1, 1); OpTime time2(1, 2); + HandshakeArgs handshake1; + ASSERT_OK(handshake1.initialize(BSON("handshake" << client1 << "member" << 1))); + ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake1)); + HandshakeArgs handshake2; + ASSERT_OK(handshake2.initialize(BSON("handshake" << client2 << "member" << 2))); + ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake2)); + HandshakeArgs handshake3; + ASSERT_OK(handshake3.initialize(BSON("handshake" << client3 << "member" << 3))); + ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake3)); + WriteConcernOptions writeConcern; writeConcern.wTimeout = WriteConcernOptions::kNoWaiting; writeConcern.wNumNodes = 2; @@ -573,16 +586,16 @@ namespace { HandshakeArgs handshake1; ASSERT_OK(handshake1.initialize(BSON("handshake" << clientRID1 << "member" << 1))); - getReplCoord()->processHandshake(&txn, handshake1); + ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake1)); HandshakeArgs handshake2; ASSERT_OK(handshake2.initialize(BSON("handshake" << clientRID2 << "member" << 2))); - getReplCoord()->processHandshake(&txn, handshake2); + ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake2)); HandshakeArgs handshake3; ASSERT_OK(handshake3.initialize(BSON("handshake" << clientRID3 << "member" << 3))); - getReplCoord()->processHandshake(&txn, handshake3); + ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake3)); HandshakeArgs handshake4; ASSERT_OK(handshake4.initialize(BSON("handshake" << clientRID4 << "member" << 4))); - getReplCoord()->processHandshake(&txn, handshake4); + ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake4)); // Test invalid write concern WriteConcernOptions invalidWriteConcern; @@ -714,7 +727,10 @@ namespace { assertStartSuccess( BSON("_id" << "mySet" << "version" << 2 << - "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0 ))), + "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0) << + BSON("host" << "node2:12345" << "_id" << 1) << + BSON("host" << "node3:12345" << "_id" << 2) << + BSON("host" << "node4:12345" << "_id" << 3))), HostAndPort("node1", 12345)); OperationContextNoop txn; @@ -726,6 +742,16 @@ namespace { OpTime time1(1, 1); OpTime time2(1, 2); + HandshakeArgs handshake1; + ASSERT_OK(handshake1.initialize(BSON("handshake" << client1 << "member" << 1))); + ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake1)); + HandshakeArgs handshake2; + ASSERT_OK(handshake2.initialize(BSON("handshake" << client2 << "member" << 2))); + ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake2)); + HandshakeArgs handshake3; + ASSERT_OK(handshake3.initialize(BSON("handshake" << client3 << "member" << 3))); + ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake3)); + WriteConcernOptions writeConcern; writeConcern.wTimeout = WriteConcernOptions::kNoTimeout; writeConcern.wNumNodes = 2; @@ -763,7 +789,9 @@ namespace { assertStartSuccess( BSON("_id" << "mySet" << "version" << 2 << - "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0 ))), + "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0) << + BSON("host" << "node2:12345" << "_id" << 1) << + BSON("host" << "node3:12345" << "_id" << 2))), HostAndPort("node1", 12345)); OperationContextNoop txn; ReplicationAwaiter awaiter(getReplCoord(), &txn); @@ -773,6 +801,13 @@ namespace { OpTime time1(1, 1); OpTime time2(1, 2); + HandshakeArgs handshake1; + ASSERT_OK(handshake1.initialize(BSON("handshake" << client1 << "member" << 1))); + ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake1)); + HandshakeArgs handshake2; + ASSERT_OK(handshake2.initialize(BSON("handshake" << client2 << "member" << 2))); + ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake2)); + WriteConcernOptions writeConcern; writeConcern.wTimeout = 50; writeConcern.wNumNodes = 2; @@ -792,7 +827,9 @@ namespace { assertStartSuccess( BSON("_id" << "mySet" << "version" << 2 << - "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0 ))), + "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0) << + BSON("host" << "node2:12345" << "_id" << 1) << + BSON("host" << "node3:12345" << "_id" << 2))), HostAndPort("node1", 12345)); OperationContextNoop txn; ReplicationAwaiter awaiter(getReplCoord(), &txn); @@ -802,6 +839,13 @@ namespace { OpTime time1(1, 1); OpTime time2(1, 2); + HandshakeArgs handshake1; + ASSERT_OK(handshake1.initialize(BSON("handshake" << client1 << "member" << 1))); + ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake1)); + HandshakeArgs handshake2; + ASSERT_OK(handshake2.initialize(BSON("handshake" << client2 << "member" << 2))); + ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake2)); + WriteConcernOptions writeConcern; writeConcern.wTimeout = WriteConcernOptions::kNoTimeout; writeConcern.wNumNodes = 2; |